BUG-2138: Create DistributedShardFrontend 29/46829/76
authorTomas Cere <tcere@cisco.com>
Wed, 7 Sep 2016 11:48:27 +0000 (13:48 +0200)
committerJakub Morvay <jmorvay@cisco.com>
Wed, 8 Mar 2017 09:17:30 +0000 (10:17 +0100)
Use the abstract shard implementations from md-sal to create
a frontend implementation of a cds shard that forwards requests
to backend shards via DistributedDatastoreClient.

Change-Id: I7a3485f414368728e71ab2746c84d7a0f83f1436
Signed-off-by: Tomas Cere <tcere@cisco.com>
55 files changed:
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardManagerSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ModuleShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModification.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationCursor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactoryBuilder.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardFrontend.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractClusterRefActorTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractClusterRefEntityOwnershipTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotListTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardManagerSnapshotTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerGetSnapshotReplyActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactoryTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties

index 02ad5c2..d42b346 100644 (file)
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-remote_${scala.version}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-distributed-data-experimental_${scala.version}</artifactId>
+      <version>2.4.7</version>
+    </dependency>
     <dependency>
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-slf4j_${scala.version}</artifactId>
index 00f7572..a3afa22 100644 (file)
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
  *
  * @author Robert Varga
  */
-abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
+public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
     enum State {
         IDLE,
         TX_OPEN,
@@ -194,7 +194,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
      * @throws TransactionChainClosedException if this history is closed
      * @throws IllegalStateException if a previous dependent transaction has not been closed
      */
-    public final ClientTransaction createTransaction() {
+    public ClientTransaction createTransaction() {
         checkNotClosed();
 
         synchronized (this) {
index 4598d95..493eb40 100644 (file)
@@ -25,7 +25,7 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
  * @author Robert Varga
  */
 @Beta
-public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
+public class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
     ClientLocalHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
         super(client, historyId);
     }
index f0ce2fa..efa503a 100644 (file)
@@ -53,7 +53,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * @author Robert Varga
  */
 @Beta
-public final class ClientTransaction extends AbstractClientHandle<AbstractProxyTransaction> {
+public class ClientTransaction extends AbstractClientHandle<AbstractProxyTransaction> {
 
     private ClientTransactionCursor cursor;
 
index 994f076..a24f314 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.base.Preconditions;
 import java.io.ObjectStreamException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nonnull;
 
@@ -40,6 +41,7 @@ public class ShardManagerSnapshot implements Serializable {
     }
 
     private Object readResolve() throws ObjectStreamException {
-        return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList);
+        return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList,
+                Collections.emptyMap());
     }
 }
index 3bf37e0..d5878c6 100644 (file)
@@ -9,12 +9,13 @@
 package org.opendaylight.controller.cluster.datastore.config;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 
 public interface Configuration {
 
@@ -36,7 +37,7 @@ public interface Configuration {
     /**
      * Return the shard name corresponding to the prefix, or null if none is configured.
      */
-    @Nullable String getShardNameForPrefix(@Nonnull YangInstanceIdentifier prefix);
+    @Nullable String getShardNameForPrefix(@Nonnull DOMDataTreeIdentifier prefix);
 
     /**
      * Returns the member replicas for the given shard name.
@@ -63,6 +64,18 @@ public interface Configuration {
      */
     void addPrefixShardConfiguration(@Nonnull PrefixShardConfiguration config);
 
+    /**
+     * Removes a shard configuration for the specified prefix.
+     */
+    void removePrefixShardConfiguration(@Nonnull DOMDataTreeIdentifier prefix);
+
+    /**
+     * Returns the configuration for all configured prefix shards.
+     *
+     * @return An immutable copy of the currently configured prefix shards.
+     */
+    Map<DOMDataTreeIdentifier, PrefixShardConfiguration> getAllPrefixShardConfigurations();
+
     /**
      * Returns a unique set of all member names configured for all shards.
      */
@@ -86,5 +99,5 @@ public interface Configuration {
     /**
      * Returns the ShardStrategy for the given prefix or null if the prefix is not found.
      */
-    @Nullable ShardStrategy getStrategyForPrefix(@Nonnull YangInstanceIdentifier prefix);
+    @Nullable ShardStrategy getStrategyForPrefix(@Nonnull DOMDataTreeIdentifier prefix);
 }
index 59acdbd..c1f687b 100644 (file)
@@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardSt
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 // TODO clean this up once we get rid of module based configuration, prefix one should be alot simpler
@@ -35,7 +36,7 @@ public class ConfigurationImpl implements Configuration {
     private volatile Map<String, ModuleConfig> moduleConfigMap;
 
     // TODO should this be initialized with something? on restart we should restore the shards from configuration?
-    private volatile Map<YangInstanceIdentifier, PrefixShardConfiguration> prefixConfigMap = Collections.emptyMap();
+    private volatile Map<DOMDataTreeIdentifier, PrefixShardConfiguration> prefixConfigMap = Collections.emptyMap();
 
     // Look up maps to speed things up
 
@@ -121,21 +122,22 @@ public class ConfigurationImpl implements Configuration {
 
     @Nullable
     @Override
-    public String getShardNameForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+    public String getShardNameForPrefix(@Nonnull final DOMDataTreeIdentifier prefix) {
         Preconditions.checkNotNull(prefix, "prefix should not be null");
 
-        Entry<YangInstanceIdentifier, PrefixShardConfiguration> bestMatchEntry =
-                new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null);
+        Entry<DOMDataTreeIdentifier, PrefixShardConfiguration> bestMatchEntry =
+                new SimpleEntry<>(
+                        new DOMDataTreeIdentifier(prefix.getDatastoreType(), YangInstanceIdentifier.EMPTY), null);
 
-        for (Entry<YangInstanceIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
-            if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size()
-                    > bestMatchEntry.getKey().getPathArguments().size()) {
+        for (Entry<DOMDataTreeIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
+            if (entry.getKey().contains(prefix) && entry.getKey().getRootIdentifier().getPathArguments().size()
+                    > bestMatchEntry.getKey().getRootIdentifier().getPathArguments().size()) {
                 bestMatchEntry = entry;
             }
         }
 
         //TODO we really should have mapping based on prefix instead of Strings
-        return ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier());
+        return ClusterUtils.getCleanShardName(bestMatchEntry.getKey().getRootIdentifier());
     }
 
     @Override
@@ -192,14 +194,37 @@ public class ConfigurationImpl implements Configuration {
     @Override
     public void addPrefixShardConfiguration(@Nonnull final PrefixShardConfiguration config) {
         Preconditions.checkNotNull(config, "PrefixShardConfiguration cannot be null");
-        updatePrefixConfigMap(config);
+        addPrefixConfig(config);
         allShardNames = ImmutableSet.<String>builder().addAll(allShardNames)
                 .add(ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())).build();
     }
 
-    private void updatePrefixConfigMap(final PrefixShardConfiguration config) {
-        final Map<YangInstanceIdentifier, PrefixShardConfiguration> newPrefixConfigMap = new HashMap<>(prefixConfigMap);
-        newPrefixConfigMap.put(config.getPrefix().getRootIdentifier(), config);
+    @Override
+    public void removePrefixShardConfiguration(@Nonnull final DOMDataTreeIdentifier prefix) {
+        Preconditions.checkNotNull(prefix, "Prefix cannot be null");
+
+        removePrefixConfig(prefix);
+
+        final HashSet<String> temp = new HashSet<>(allShardNames);
+        temp.remove(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+
+        allShardNames = ImmutableSet.copyOf(temp);
+    }
+
+    @Override
+    public Map<DOMDataTreeIdentifier, PrefixShardConfiguration> getAllPrefixShardConfigurations() {
+        return ImmutableMap.copyOf(prefixConfigMap);
+    }
+
+    private void addPrefixConfig(final PrefixShardConfiguration config) {
+        final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newPrefixConfigMap = new HashMap<>(prefixConfigMap);
+        newPrefixConfigMap.put(config.getPrefix(), config);
+        prefixConfigMap = ImmutableMap.copyOf(newPrefixConfigMap);
+    }
+
+    private void removePrefixConfig(final DOMDataTreeIdentifier prefix) {
+        final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newPrefixConfigMap = new HashMap<>(prefixConfigMap);
+        newPrefixConfigMap.remove(prefix);
         prefixConfigMap = ImmutableMap.copyOf(newPrefixConfigMap);
     }
 
@@ -246,15 +271,16 @@ public class ConfigurationImpl implements Configuration {
     }
 
     @Override
-    public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+    public ShardStrategy getStrategyForPrefix(@Nonnull final DOMDataTreeIdentifier prefix) {
         Preconditions.checkNotNull(prefix, "Prefix cannot be null");
         // FIXME using prefix tables like in mdsal will be better
-        Entry<YangInstanceIdentifier, PrefixShardConfiguration> bestMatchEntry =
-                new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null);
+        Entry<DOMDataTreeIdentifier, PrefixShardConfiguration> bestMatchEntry =
+                new SimpleEntry<>(
+                        new DOMDataTreeIdentifier(prefix.getDatastoreType(), YangInstanceIdentifier.EMPTY), null);
 
-        for (Entry<YangInstanceIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
-            if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size()
-                    > bestMatchEntry.getKey().getPathArguments().size()) {
+        for (Entry<DOMDataTreeIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
+            if (entry.getKey().contains(prefix) && entry.getKey().getRootIdentifier().getPathArguments().size()
+                    > bestMatchEntry.getKey().getRootIdentifier().getPathArguments().size()) {
                 bestMatchEntry = entry;
             }
         }
@@ -262,8 +288,9 @@ public class ConfigurationImpl implements Configuration {
         if (bestMatchEntry.getValue() == null) {
             return null;
         }
-        return new PrefixShardStrategy(
-                ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier()), this);
+        return new PrefixShardStrategy(ClusterUtils
+                .getCleanShardName(bestMatchEntry.getKey().getRootIdentifier()),
+                bestMatchEntry.getKey().getRootIdentifier());
     }
 
     private void updateModuleConfigMap(final ModuleConfig moduleConfig) {
index d18e05b..6b79d54 100644 (file)
@@ -8,15 +8,19 @@
 
 package org.opendaylight.controller.cluster.datastore.config;
 
+import akka.cluster.ddata.ReplicatedData;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.HashSet;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 
 /**
  * Configuration for prefix based shards.
  */
-public class PrefixShardConfiguration implements Serializable {
+public class PrefixShardConfiguration implements ReplicatedData, Serializable {
     private static final long serialVersionUID = 1L;
 
     private final DOMDataTreeIdentifier prefix;
@@ -26,9 +30,9 @@ public class PrefixShardConfiguration implements Serializable {
     public PrefixShardConfiguration(final DOMDataTreeIdentifier prefix,
                                     final String shardStrategyName,
                                     final Collection<MemberName> shardMemberNames) {
-        this.prefix = prefix;
-        this.shardStrategyName = shardStrategyName;
-        this.shardMemberNames = shardMemberNames;
+        this.prefix = Preconditions.checkNotNull(prefix);
+        this.shardStrategyName = Preconditions.checkNotNull(shardStrategyName);
+        this.shardMemberNames = ImmutableSet.copyOf(shardMemberNames);
     }
 
     public DOMDataTreeIdentifier getPrefix() {
@@ -42,4 +46,34 @@ public class PrefixShardConfiguration implements Serializable {
     public Collection<MemberName> getShardMemberNames() {
         return shardMemberNames;
     }
+
+    @Override
+    public String toString() {
+        return "PrefixShardConfiguration{"
+                + "prefix=" + prefix
+                + ", shardStrategyName='"
+                + shardStrategyName + '\''
+                + ", shardMemberNames=" + shardMemberNames
+                + '}';
+    }
+
+    public String toDataMapKey() {
+        return "prefix=" + prefix;
+    }
+
+    @Override
+    public ReplicatedData merge(final ReplicatedData replicatedData) {
+        if (!(replicatedData instanceof PrefixShardConfiguration)) {
+            throw new IllegalStateException("replicatedData expected to be instance of PrefixShardConfiguration");
+        }
+        final PrefixShardConfiguration entry = (PrefixShardConfiguration) replicatedData;
+        if (!entry.getPrefix().equals(prefix)) {
+            // this should never happen since the key is the prefix
+            // if it does just return current?
+            return this;
+        }
+        final HashSet<MemberName> members = new HashSet<>(shardMemberNames);
+        members.addAll(entry.getShardMemberNames());
+        return new PrefixShardConfiguration(prefix, shardStrategyName, members);
+    }
 }
index 84585f4..dbd0310 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import com.google.common.base.Preconditions;
 import java.util.Optional;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 
@@ -24,13 +25,13 @@ public class ShardLeaderStateChanged extends LeaderStateChanged {
 
     private final DataTree localShardDataTree;
 
-    public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId,
+    public ShardLeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId,
             @Nonnull DataTree localShardDataTree, short leaderPayloadVersion) {
         super(memberId, leaderId, leaderPayloadVersion);
         this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
     }
 
-    public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId,
+    public ShardLeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId,
             short leaderPayloadVersion) {
         super(memberId, leaderId, leaderPayloadVersion);
         this.localShardDataTree = null;
index f6be1b8..9384675 100644 (file)
@@ -8,14 +8,19 @@
 package org.opendaylight.controller.cluster.datastore.persisted;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 
 /**
  * Represents the persisted snapshot state for the ShardManager.
@@ -47,6 +52,12 @@ public class ShardManagerSnapshot implements Serializable {
             for (String shard: snapshot.shardList) {
                 out.writeObject(shard);
             }
+
+            out.writeInt(snapshot.prefixShardConfiguration.size());
+            for (Map.Entry prefixShardConfigEntry : snapshot.prefixShardConfiguration.entrySet()) {
+                out.writeObject(prefixShardConfigEntry.getKey());
+                out.writeObject(prefixShardConfigEntry.getValue());
+            }
         }
 
         @Override
@@ -57,7 +68,14 @@ public class ShardManagerSnapshot implements Serializable {
                 shardList.add((String) in.readObject());
             }
 
-            snapshot = new ShardManagerSnapshot(shardList);
+            size = in.readInt();
+            Map<DOMDataTreeIdentifier, PrefixShardConfiguration> prefixShardConfiguration = new HashMap<>(size);
+            for (int i = 0; i < size; i++) {
+                prefixShardConfiguration.put((DOMDataTreeIdentifier) in.readObject(),
+                        (PrefixShardConfiguration) in.readObject());
+            }
+
+            snapshot = new ShardManagerSnapshot(shardList, prefixShardConfiguration);
         }
 
         private Object readResolve() {
@@ -66,9 +84,12 @@ public class ShardManagerSnapshot implements Serializable {
     }
 
     private final List<String> shardList;
+    private final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> prefixShardConfiguration;
 
-    public ShardManagerSnapshot(@Nonnull final List<String> shardList) {
+    public ShardManagerSnapshot(@Nonnull final List<String> shardList,
+                                final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> prefixShardConfiguration) {
         this.shardList = ImmutableList.copyOf(shardList);
+        this.prefixShardConfiguration = ImmutableMap.copyOf(prefixShardConfiguration);
     }
 
     public List<String> getShardList() {
index b2c9d30..9085027 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
+import static akka.actor.ActorRef.noSender;
 import static akka.pattern.Patterns.ask;
 
 import akka.actor.ActorRef;
@@ -21,6 +22,10 @@ import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
 import akka.cluster.ClusterEvent.MemberWeaklyUp;
 import akka.cluster.Member;
+import akka.cluster.ddata.DistributedData;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.Replicator.Changed;
+import akka.cluster.ddata.Replicator.Subscribe;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
@@ -33,6 +38,8 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -47,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -102,6 +110,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,6 +166,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String persistenceId;
 
+    private final ActorRef replicator;
+
     ShardManager(AbstractShardManagerCreator<?> builder) {
         this.cluster = builder.getCluster();
         this.configuration = builder.getConfiguration();
@@ -180,6 +191,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "shard-manager-" + this.type,
                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
         shardManagerMBean.registerMBean();
+
+        replicator = DistributedData.get(context().system()).replicator();
+
+    }
+
+    public void preStart() {
+        LOG.info("Starting Shardmanager {}", persistenceId);
+
+        final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
+                new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
+        replicator.tell(subscribe, noSender());
     }
 
     @Override
@@ -261,6 +283,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onGetLocalShardIds();
         } else if (message instanceof RunnableMessage) {
             ((RunnableMessage)message).run();
+        } else if (message instanceof Changed) {
+            onConfigChanged((Changed) message);
         } else {
             unknownMessage(message);
         }
@@ -316,6 +340,84 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
+        LOG.debug("{}, ShardManager {} received config changed {}",
+                cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries());
+
+        final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
+
+        final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
+                changedConfig.values().stream().collect(
+                        Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity()));
+
+        resolveConfig(newConfig);
+    }
+
+    private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
+        LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}",
+                cluster.getCurrentMemberName(), persistenceId, newConfig);
+
+        newConfig.forEach((prefix, config) ->
+                LOG.debug("{} ShardManager : {}, received shard config "
+                        + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config));
+
+        final SetView<DOMDataTreeIdentifier> removedConfigs =
+                Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet());
+
+        // resolve removals
+
+        resolveRemovals(removedConfigs);
+
+        final SetView<DOMDataTreeIdentifier> addedConfigs =
+                Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet());
+        // resolve additions
+
+        resolveAdditions(addedConfigs, newConfig);
+        // iter through to update existing shards, either start/stop replicas or update the shard
+        // to check for more peers
+        resolveUpdates(Collections.emptySet());
+    }
+
+    private void resolveRemovals(final Set<DOMDataTreeIdentifier> removedConfigs) {
+        LOG.debug("{} ShardManager : {}, resolving removed configs : {}",
+                cluster.getCurrentMemberName(), persistenceId, removedConfigs);
+
+        removedConfigs.forEach(id -> doRemovePrefixedShard(id));
+    }
+
+    private void resolveAdditions(final Set<DOMDataTreeIdentifier> addedConfigs,
+                                  final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
+        LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
+
+        addedConfigs.forEach(id -> doCreatePrefixedShard(configs.get(id)));
+    }
+
+    private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
+        LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs);
+    }
+
+    private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) {
+        LOG.debug("{} ShardManager : {}, removing prefix shard: {}",
+                cluster.getCurrentMemberName(), persistenceId, prefix);
+        final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
+        final ShardInformation shard = localShards.remove(shardId.getShardName());
+
+        configuration.removePrefixShardConfiguration(prefix);
+
+        if (shard == null) {
+            LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix);
+            return;
+        }
+
+        if (shard.getActor() != null) {
+            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
+            shard.getActor().tell(Shutdown.INSTANCE, self());
+        }
+        LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(),
+                persistenceId(), shardId.getShardName());
+        persistShardList();
+    }
+
     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
             String leaderPath) {
         shardReplicaOperationsInProgress.remove(shardId.getShardName());
@@ -468,42 +570,41 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
-        final PrefixShardConfiguration config = createPrefixedShard.getConfig();
+        doCreatePrefixedShard(createPrefixedShard.getConfig());
+        // do not replicate on this level
+    }
+
+    private void doCreatePrefixedShard(final PrefixShardConfiguration config) {
+        LOG.debug("doCreatePrefixShard : {}", config.getPrefix());
 
         final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
-                createPrefixedShard.getConfig().getPrefix());
+                config.getPrefix());
         final String shardName = shardId.getShardName();
 
         configuration.addPrefixShardConfiguration(config);
 
-        DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
-
-        if (shardDatastoreContext == null) {
-            final Builder builder = newShardDatastoreContextBuilder(shardName);
-            builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
-                    .storeRoot(config.getPrefix().getRootIdentifier());
-            shardDatastoreContext = builder.build();
-        } else {
-            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
-                    peerAddressResolver).build();
-        }
-
-        final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
-                && currentSnapshot.getShardList().contains(shardName);
+        final Builder builder = newShardDatastoreContextBuilder(shardName);
+        builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+                .storeRoot(config.getPrefix().getRootIdentifier());
+        DatastoreContext shardDatastoreContext = builder.build();
 
         final Map<String, String> peerAddresses = Collections.emptyMap();
         final boolean isActiveMember = true;
-        LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
-                persistenceId(), shardId, peerAddresses, isActiveMember);
+        LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: "
+                        + "{}, peerAddresses: {}, isActiveMember: {}",
+                shardId, persistenceId(), config.getShardMemberNames(),
+                peerAddresses, isActiveMember);
 
         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
-                shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
+                shardDatastoreContext, Shard.builder(), peerAddressResolver);
         info.setActiveMember(isActiveMember);
         localShards.put(info.getShardName(), info);
 
         if (schemaContext != null) {
             info.setActor(newShardActor(schemaContext, info));
         }
+
+        persistShardList();
     }
 
     private void doCreateShard(final CreateShard createShard) {
@@ -1094,9 +1195,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param shardName the shard name
      */
     private Map<String, String> getPeerAddresses(String shardName) {
-        Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
-        Map<String, String> peerAddresses = new HashMap<>();
+        final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
+        return getPeerAddresses(shardName, members);
+    }
 
+    private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
+        Map<String, String> peerAddresses = new HashMap<>();
         MemberName currentMemberName = this.cluster.getCurrentMemberName();
 
         for (MemberName memberName : members) {
@@ -1371,11 +1475,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
         }
         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
-        saveSnapshot(updateShardManagerSnapshot(shardList));
+        saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
     }
 
-    private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
-        currentSnapshot = new ShardManagerSnapshot(shardList);
+    private ShardManagerSnapshot updateShardManagerSnapshot(
+            final List<String> shardList,
+            final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
+        currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
         return currentSnapshot;
     }
 
index 46fccc7..9899aeb 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nonnull;
 
@@ -43,7 +44,8 @@ public final class ShardManagerSnapshot implements Serializable {
     }
 
     private Object readResolve() {
-        return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList);
+        return new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(shardList,
+                Collections.emptyMap());
     }
 
     @Override
index a0712b6..8628b56 100644 (file)
@@ -31,4 +31,9 @@ public final class DefaultShardStrategy implements ShardStrategy {
     public String findShard(YangInstanceIdentifier path) {
         return DEFAULT_SHARD;
     }
+
+    @Override
+    public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+        return YangInstanceIdentifier.EMPTY;
+    }
 }
index c90baf2..64e24a9 100644 (file)
@@ -28,4 +28,11 @@ public class ModuleShardStrategy implements ShardStrategy {
         String shardName = configuration.getShardNameForModule(moduleName);
         return shardName != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD;
     }
+
+    @Override
+    public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+        return YangInstanceIdentifier.EMPTY;
+    }
+
+
 }
index 1e08a98..25e1160 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.shardstrategy;
 
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 /**
@@ -19,16 +18,21 @@ public class PrefixShardStrategy implements ShardStrategy {
     public static final String NAME = "prefix";
 
     private final String shardName;
-    private final Configuration configuration;
+    private final YangInstanceIdentifier prefix;
 
-    public PrefixShardStrategy(final String shardName, final Configuration configuration) {
-        this.shardName = shardName;
-        this.configuration = configuration;
+    public PrefixShardStrategy(final String shardName,
+                               final YangInstanceIdentifier prefix) {
+        this.shardName = shardName != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD;
+        this.prefix = prefix;
     }
 
     @Override
     public String findShard(final YangInstanceIdentifier path) {
-        final String shardNameForPrefix = configuration.getShardNameForPrefix(path);
-        return shardNameForPrefix != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD;
+        return shardName;
+    }
+
+    @Override
+    public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+        return prefix;
     }
 }
index 496069d..0a4c54b 100644 (file)
@@ -25,4 +25,11 @@ public interface ShardStrategy {
      * @return the corresponding shard name.
      */
     String findShard(YangInstanceIdentifier path);
+
+    /**
+     * Get the prefix of the shard that contains the data pointed to by the specified path.
+     * @param path the location of the data in the logical tree.
+     * @return the corresponding shards prefix.
+     */
+    YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path);
 }
index a97adc2..7244994 100644 (file)
@@ -10,16 +10,20 @@ package org.opendaylight.controller.cluster.datastore.shardstrategy;
 
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class ShardStrategyFactory {
     private static final String UNKNOWN_MODULE_NAME = "unknown";
 
     private final Configuration configuration;
+    private final LogicalDatastoreType logicalStoreType;
 
-    public ShardStrategyFactory(final Configuration configuration) {
+    public ShardStrategyFactory(final Configuration configuration, LogicalDatastoreType logicalStoreType) {
         Preconditions.checkState(configuration != null, "configuration should not be missing");
         this.configuration = configuration;
+        this.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
     }
 
     public ShardStrategy getStrategy(final YangInstanceIdentifier path) {
@@ -30,7 +34,8 @@ public class ShardStrategyFactory {
         final ShardStrategy shardStrategy = configuration.getStrategyForModule(moduleName);
         if (shardStrategy == null) {
             // retry with prefix based sharding
-            final ShardStrategy strategyForPrefix = configuration.getStrategyForPrefix(path);
+            final ShardStrategy strategyForPrefix =
+                    configuration.getStrategyForPrefix(new DOMDataTreeIdentifier(logicalStoreType, path));
             if (strategyForPrefix == null) {
                 return DefaultShardStrategy.getInstance();
             }
index d3eb6f3..e35dab8 100644 (file)
@@ -50,6 +50,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -129,7 +130,10 @@ public class ActorContext {
         this.datastoreContext = datastoreContext;
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
         this.primaryShardInfoCache = primaryShardInfoCache;
-        this.shardStrategyFactory = new ShardStrategyFactory(configuration);
+
+        final LogicalDatastoreType convertedType =
+                LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
+        this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
 
         setCachedProperties();
 
index dba1761..4b60f61 100644 (file)
@@ -8,19 +8,41 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import akka.cluster.ddata.Key;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORMapKey;
+import java.util.Map;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
 
 /**
  * Utils for encoding prefix shard name.
  */
 public class ClusterUtils {
 
+    // key for replicated configuration key
+    public static final Key<ORMap<PrefixShardConfiguration>> CONFIGURATION_KEY =
+            ORMapKey.create("prefix-shard-configuration");
+
     public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) {
-        return ShardIdentifier
-                .create(getCleanShardName(prefix.getRootIdentifier()), memberName, prefix.getDatastoreType().name());
+        final String type;
+        switch (prefix.getDatastoreType()) {
+            case OPERATIONAL:
+                type = "operational";
+                break;
+            case CONFIGURATION:
+                type = "config";
+                break;
+            default:
+                type = prefix.getDatastoreType().name();
+        }
+
+        return ShardIdentifier.create(getCleanShardName(prefix.getRootIdentifier()), memberName, type);
     }
 
     /**
@@ -31,10 +53,25 @@ public class ClusterUtils {
      * @return encoded name that doesn't contain characters that cannot be in actor path.
      */
     public static String getCleanShardName(final YangInstanceIdentifier path) {
+        if (path.isEmpty()) {
+            return "default";
+        }
+
         final StringBuilder builder = new StringBuilder();
         // TODO need a better mapping that includes namespace, but we'll need to cleanup the string beforehand
+        // we have to fight both javax and akka url path restrictions..
         path.getPathArguments().forEach(p -> {
             builder.append(p.getNodeType().getLocalName());
+            if (p instanceof NodeIdentifierWithPredicates) {
+                builder.append("-key_");
+                final Map<QName, Object> key = ((NodeIdentifierWithPredicates) p).getKeyValues();
+                key.entrySet().forEach(e -> {
+                    builder.append(e.getKey().getLocalName());
+                    builder.append(e.getValue());
+                    builder.append("-");
+                });
+                builder.append("_");
+            }
             builder.append("!");
         });
         return builder.toString();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java
new file mode 100644 (file)
index 0000000..8edfdde
--- /dev/null
@@ -0,0 +1,142 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
+import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard;
+import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification;
+import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy implementation of a shard that creates forwarding producers to the backend shard.
+ */
+class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedShardFrontend.class);
+
+    private final DataStoreClient client;
+    private final DOMDataTreeIdentifier shardRoot;
+    @GuardedBy("this")
+    private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
+    @GuardedBy("this")
+    private final List<ShardProxyProducer> producers = new ArrayList<>();
+    private final DistributedDataStore distributedDataStore;
+
+    DistributedShardFrontend(final DistributedDataStore distributedDataStore,
+                             final DataStoreClient client,
+                             final DOMDataTreeIdentifier shardRoot) {
+        this.distributedDataStore = Preconditions.checkNotNull(distributedDataStore);
+        this.client = Preconditions.checkNotNull(client);
+        this.shardRoot = Preconditions.checkNotNull(shardRoot);
+    }
+
+    @Override
+    public synchronized DOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> paths) {
+        for (final DOMDataTreeIdentifier prodPrefix : paths) {
+            Preconditions.checkArgument(paths.contains(prodPrefix), "Prefix %s is not contained under shard root",
+                    prodPrefix, paths);
+        }
+
+        final ShardProxyProducer ret =
+                new ShardProxyProducer(shardRoot, paths, client, createModificationFactory(paths));
+        producers.add(ret);
+        return ret;
+    }
+
+    @Override
+    public synchronized void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+        LOG.debug("{} : Child shard attached at {}", shardRoot, prefix);
+        Preconditions.checkArgument(child != this, "Attempted to attach child %s onto self", this);
+        addChildShard(prefix, child);
+        updateProducers();
+    }
+
+    @Override
+    public synchronized void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+        LOG.debug("{} : Child shard detached at {}", shardRoot, prefix);
+        childShards.remove(prefix);
+        updateProducers();
+        // TODO we should grab the dataTreeSnapshot that's in the shard and apply it to this shard
+    }
+
+    private void addChildShard(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+        Preconditions.checkArgument(child instanceof WriteableDOMDataTreeShard);
+        childShards.put(prefix, new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child));
+    }
+
+    DistributedShardModificationFactory createModificationFactory(final Collection<DOMDataTreeIdentifier> prefixes) {
+        // TODO this could be abstract
+        final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
+
+        for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
+            for (final ChildShardContext maybeAffected : childShards.values()) {
+                final DOMDataTreeIdentifier bindPath;
+                if (producerPrefix.contains(maybeAffected.getPrefix())) {
+                    bindPath = maybeAffected.getPrefix();
+                } else if (maybeAffected.getPrefix().contains(producerPrefix)) {
+                    // Bound path is inside subshard
+                    bindPath = producerPrefix;
+                } else {
+                    continue;
+                }
+
+                SubshardProducerSpecification spec = affectedSubshards.get(maybeAffected.getPrefix());
+                if (spec == null) {
+                    spec = new SubshardProducerSpecification(maybeAffected);
+                    affectedSubshards.put(maybeAffected.getPrefix(), spec);
+                }
+                spec.addPrefix(bindPath);
+            }
+        }
+
+        final DistributedShardModificationFactoryBuilder builder =
+                new DistributedShardModificationFactoryBuilder(shardRoot);
+        for (final SubshardProducerSpecification spec : affectedSubshards.values()) {
+            final ForeignShardModificationContext foreignContext =
+                    new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
+            builder.addSubshard(foreignContext);
+            builder.addSubshard(spec.getPrefix(), foreignContext);
+        }
+
+        return builder.build();
+    }
+
+    private void updateProducers() {
+        for (final ShardProxyProducer producer : producers) {
+            producer.setModificationFactory(createModificationFactory(producer.getPrefixes()));
+        }
+    }
+
+    @Nonnull
+    @Override
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+            final YangInstanceIdentifier treeId, final L listener) {
+        throw new UnsupportedOperationException("Listener registration not supported");
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModification.java
new file mode 100644 (file)
index 0000000..2bd0dae
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
+import org.opendaylight.mdsal.dom.spi.shard.WritableNodeOperation;
+import org.opendaylight.mdsal.dom.spi.shard.WriteCursorStrategy;
+import org.opendaylight.mdsal.dom.spi.shard.WriteableModificationNode;
+import org.opendaylight.mdsal.dom.spi.shard.WriteableNodeWithSubshard;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+
+/**
+ * Shard modification that consists of the whole shard context, provides cursors which correctly delegate to subshards
+ * if any are present.
+ */
+public class DistributedShardModification extends WriteableNodeWithSubshard {
+
+    private final DistributedShardModificationContext context;
+    private final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards;
+
+    public DistributedShardModification(final DistributedShardModificationContext context,
+                                        final Map<PathArgument, WriteableModificationNode> subshards,
+                                        final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards) {
+        super(subshards);
+        this.context = Preconditions.checkNotNull(context);
+        this.childShards = Preconditions.checkNotNull(childShards);
+    }
+
+    @Override
+    public PathArgument getIdentifier() {
+        return context.getIdentifier().getRootIdentifier().getLastPathArgument();
+    }
+
+    @Override
+    public WriteCursorStrategy createOperation(final DOMDataTreeWriteCursor parentCursor) {
+        return new WritableNodeOperation(this, context.cursor()) {
+            @Override
+            public void exit() {
+                throw new IllegalStateException("Can not exit data tree root");
+            }
+        };
+    }
+
+    void cursorClosed() {
+        context.closeCursor();
+    }
+
+    DOMStoreThreePhaseCommitCohort seal() {
+        childShards.values().forEach(ForeignShardModificationContext::ready);
+        return context.ready();
+    }
+
+    DOMDataTreeIdentifier getPrefix() {
+        return context.getIdentifier();
+    }
+
+    Map<DOMDataTreeIdentifier, ForeignShardModificationContext> getChildShards() {
+        return childShards;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationContext.java
new file mode 100644 (file)
index 0000000..9d51684
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+
+/**
+ * The context for a single shards modification, keeps a ClientTransaction so it can route requests correctly.
+ */
+public class DistributedShardModificationContext {
+
+    private ClientTransaction transaction;
+    private DOMDataTreeIdentifier identifier;
+    private DOMDataTreeWriteCursor cursor;
+
+    public DistributedShardModificationContext(final ClientTransaction transaction,
+                                               final DOMDataTreeIdentifier identifier) {
+        this.transaction = transaction;
+        this.identifier = identifier;
+    }
+
+    public DOMDataTreeIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    DOMDataTreeWriteCursor cursor() {
+        if (cursor == null) {
+            cursor = transaction.openCursor();
+        }
+
+        return cursor;
+    }
+
+    DOMStoreThreePhaseCommitCohort ready() {
+        if (cursor != null) {
+            cursor.close();
+            cursor = null;
+        }
+
+        return transaction.ready();
+    }
+
+    void closeCursor() {
+        if (cursor != null) {
+            cursor.close();
+            cursor = null;
+        }
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationCursor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationCursor.java
new file mode 100644 (file)
index 0000000..37ccf60
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import org.opendaylight.mdsal.dom.spi.shard.AbstractDataModificationCursor;
+import org.opendaylight.mdsal.dom.spi.shard.WriteCursorStrategy;
+
+/**
+ * Internal cursor implementation consisting of WriteCursorStrategies which forwards writes to foreign modifications
+ * if any.
+ */
+public class DistributedShardModificationCursor extends AbstractDataModificationCursor<DistributedShardModification> {
+
+    private ShardProxyTransaction parent;
+
+    public DistributedShardModificationCursor(final DistributedShardModification root,
+                                              final ShardProxyTransaction parent) {
+        super(root);
+        this.parent = parent;
+    }
+
+    @Override
+    protected WriteCursorStrategy getRootOperation(final DistributedShardModification root) {
+        return root.createOperation(null);
+    }
+
+    @Override
+    public void close() {
+        parent.cursorClosed();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactory.java
new file mode 100644 (file)
index 0000000..8fc1f48
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
+import org.opendaylight.mdsal.dom.spi.shard.WriteableModificationNode;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+
+/**
+ * Factory for {@link DistributedShardModification}.
+ */
+public final class DistributedShardModificationFactory {
+    private final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards;
+    private final Map<PathArgument, WriteableModificationNode> children;
+    private final DOMDataTreeIdentifier root;
+
+    DistributedShardModificationFactory(final DOMDataTreeIdentifier root,
+                                        final Map<PathArgument, WriteableModificationNode> children,
+                                        final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards) {
+        this.root = Preconditions.checkNotNull(root);
+        this.children = ImmutableMap.copyOf(children);
+        this.childShards = ImmutableMap.copyOf(childShards);
+    }
+
+    @VisibleForTesting
+    Map<PathArgument, WriteableModificationNode> getChildren() {
+        return children;
+    }
+
+    @VisibleForTesting
+    Map<DOMDataTreeIdentifier, ForeignShardModificationContext> getChildShards() {
+        return childShards;
+    }
+
+    DistributedShardModification createModification(final ClientTransaction transaction) {
+        return new DistributedShardModification(
+                new DistributedShardModificationContext(transaction, root), children, childShards);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactoryBuilder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardModificationFactoryBuilder.java
new file mode 100644 (file)
index 0000000..15459ce
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.store.inmemory.ShardDataModificationFactoryBuilder;
+
+/**
+ * Builder for {@link DistributedShardModificationFactory}.
+ */
+public class DistributedShardModificationFactoryBuilder
+        extends ShardDataModificationFactoryBuilder<DistributedShardModificationFactory> {
+
+
+    public DistributedShardModificationFactoryBuilder(final DOMDataTreeIdentifier root) {
+        super(root);
+    }
+
+    @Override
+    public DistributedShardModificationFactory build() {
+        return new DistributedShardModificationFactory(root, buildChildren(), childShards);
+    }
+}
index 91b479d..5575438 100644 (file)
@@ -14,30 +14,36 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.ForwardingObject;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.concurrent.CompletionException;
+import java.util.EnumMap;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
@@ -48,12 +54,15 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
+import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
 import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
+import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import scala.collection.JavaConverters;
 
 /**
  * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
@@ -79,6 +88,12 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     private final ActorRef shardedDataTreeActor;
     private final MemberName memberName;
 
+    private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
+            DOMDataTreePrefixTable.create();
+
+    private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
+            new EnumMap<>(LogicalDatastoreType.class);
+
     public DistributedShardedDOMDataTree(final ActorSystem actorSystem,
                                          final DistributedDataStore distributedOperDatastore,
                                          final DistributedDataStore distributedConfigDatastore) {
@@ -89,8 +104,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
 
         shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
                 new ShardedDataTreeActorCreator()
-                        .setDataTreeService(shardedDOMDataTree)
-                        .setShardingService(shardedDOMDataTree)
+                        .setShardingService(this)
                         .setActorSystem(actorSystem)
                         .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
                         .setDistributedConfigDatastore(distributedConfigDatastore)
@@ -98,6 +112,21 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
                 ACTOR_ID);
 
         this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
+
+        //create shard registration for DEFAULT_SHARD
+        try {
+            defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
+                    initDefaultShard(LogicalDatastoreType.CONFIGURATION));
+        } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
+            LOG.error("Unable to create default shard frontend for config shard", e);
+        }
+
+        try {
+            defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
+                    initDefaultShard(LogicalDatastoreType.OPERATIONAL));
+        } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
+            LOG.error("Unable to create default shard frontend for operational shard", e);
+        }
     }
 
     @Nonnull
@@ -113,13 +142,15 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     @Nonnull
     @Override
     public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
-        LOG.debug("Creating producer for {}", subtrees);
+        LOG.debug("{} - Creating producer for {}",
+                distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
         final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
 
         final Object response = distributedConfigDatastore.getActorContext()
                 .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
         if (response == null) {
-            LOG.debug("Received success from remote nodes, creating producer:{}", subtrees);
+            LOG.debug("{} - Received success from remote nodes, creating producer:{}",
+                    distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
             return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
                     distributedConfigDatastore.getActorContext());
         } else if (response instanceof Exception) {
@@ -133,74 +164,92 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
+    //TODO: it would be better to block here until the message is processed by the actor
     public DistributedShardRegistration createDistributedShard(
             final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
-            throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException,
-            DOMDataTreeShardCreationFailedException {
+            throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException {
+        final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
+                shards.lookup(prefix);
+        if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
+            throw new DOMDataTreeShardingConflictException(
+                    "Prefix " + prefix + " is already occupied by another shard.");
+        }
+
+        PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
+        shardedDataTreeActor.tell(new CreatePrefixShard(config), noSender());
+
+        return new DistributedShardRegistrationImpl(prefix, shardedDataTreeActor, this);
+    }
+
+    void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
+        LOG.debug("Member {}: Resolving additions : {}", memberName, additions);
+        final ArrayList<DOMDataTreeIdentifier> list = new ArrayList<>(additions);
+        // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that
+        Collections.sort(list, (o1, o2) -> {
+            if (o1.getRootIdentifier().getPathArguments().size() < o2.getRootIdentifier().getPathArguments().size()) {
+                return -1;
+            } else if (o1.getRootIdentifier().getPathArguments().size()
+                    == o2.getRootIdentifier().getPathArguments().size()) {
+                return 0;
+            } else {
+                return 1;
+            }
+        });
+        list.forEach(this::createShardFrontend);
+    }
 
+    void resolveShardRemovals(final Set<DOMDataTreeIdentifier> removals) {
+        LOG.debug("Member {}: Resolving removals : {}", memberName, removals);
+
+        // do we need to go from bottom to top?
+        removals.forEach(this::despawnShardFrontend);
+    }
+
+    private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
+        LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
         final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
         final DistributedDataStore distributedDataStore =
                 prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
                         ? distributedConfigDatastore : distributedOperDatastore;
 
-        final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
-        if (replicaMembers.contains(memberName)) {
-            // spawn the backend shard and have the shard Manager create all replicas
-            final ActorRef shardManager = distributedDataStore.getActorContext().getShardManager();
-
-            shardManager.tell(new CreatePrefixedShard(config, null, Shard.builder()), noSender());
-        }
-
-        LOG.debug("Creating distributed datastore client for shard {}", shardName);
-        final Props distributedDataStoreClientProps =
-                SimpleDataStoreClientActor
-                        .props(memberName, "Shard-" + shardName, distributedDataStore.getActorContext(), shardName);
+        try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
+            final Entry<DataStoreClient, ActorRef> entry =
+                    createDatastoreClient(shardName, distributedDataStore.getActorContext());
 
-        final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
-        final DataStoreClient client;
-        try {
-            client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
-        } catch (final Exception e) {
-            LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
-            clientActor.tell(PoisonPill.getInstance(), noSender());
-            throw new DOMDataTreeProducerException("Unable to create producer", e);
-        }
+            final DistributedShardFrontend shard =
+                    new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
 
-        // register the frontend into the sharding service and let the actor distribute this onto the other nodes
-        final ListenerRegistration<ShardFrontend> shardFrontendRegistration;
-        try (DOMDataTreeProducer producer = createProducer(Collections.singletonList(prefix))) {
-            shardFrontendRegistration = shardedDOMDataTree
-                    .registerDataTreeShard(prefix,
-                            new ShardFrontend(client, prefix),
-                            ((ProxyProducer) producer).delegate());
+            @SuppressWarnings("unchecked")
+            final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
+                    (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
+            shards.store(prefix, reg);
+        } catch (final DOMDataTreeShardingConflictException e) {
+            LOG.error("Prefix {} is already occupied by another shard", prefix, e);
+        } catch (DOMDataTreeProducerException e) {
+            LOG.error("Unable to close producer", e);
+        } catch (DOMDataTreeShardCreationFailedException e) {
+            LOG.error("Unable to create datastore client for shard {}", prefix, e);
         }
+    }
 
-        final Future<Object> future = distributedDataStore.getActorContext()
-                .executeOperationAsync(shardedDataTreeActor, new PrefixShardCreated(config), DEFAULT_ASK_TIMEOUT);
-        try {
-            final Object result = Await.result(future, DEFAULT_ASK_TIMEOUT.duration());
-            if (result != null) {
-                throw new DOMDataTreeShardCreationFailedException("Received unexpected response to PrefixShardCreated"
-                        + result);
-            }
-
-            return new DistributedShardRegistrationImpl(shardFrontendRegistration, prefix, shardedDataTreeActor);
-        } catch (final CompletionException e) {
-            shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
-            clientActor.tell(PoisonPill.getInstance(), noSender());
+    private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
+        LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
+        final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
+                shards.lookup(prefix);
 
-            final Throwable cause = e.getCause();
-            if (cause instanceof DOMDataTreeShardingConflictException) {
-                throw (DOMDataTreeShardingConflictException) cause;
-            }
+        if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
+            LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
+                    memberName, prefix);
+            return;
+        }
 
-            throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e.getCause());
-        } catch (final Exception e) {
-            shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
-            clientActor.tell(PoisonPill.getInstance(), noSender());
+        lookup.getValue().close();
+        // need to remove from our local table thats used for tracking
+        shards.remove(prefix);
+    }
 
-            throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e);
-        }
+    DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
+        return shardedDOMDataTree.createProducer(prefix);
     }
 
     @Nonnull
@@ -216,6 +265,38 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private Entry<DataStoreClient, ActorRef> createDatastoreClient(
+            final String shardName, final ActorContext actorContext)
+            throws DOMDataTreeShardCreationFailedException {
+
+        LOG.debug("Creating distributed datastore client for shard {}", shardName);
+        final Props distributedDataStoreClientProps =
+                SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName);
+
+        final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+        try {
+            return new SimpleEntry<>(SimpleDataStoreClientActor
+                    .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor);
+        } catch (final Exception e) {
+            LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), noSender());
+            throw new DOMDataTreeShardCreationFailedException(
+                    "Unable to create datastore client for shard{" + shardName + "}", e);
+        }
+    }
+
+    private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
+            throws DOMDataTreeProducerException, DOMDataTreeShardingConflictException {
+        final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
+            Cluster.get(actorSystem).state().members()).asJavaCollection();
+        final Collection<MemberName> names = Collections2.transform(members,
+            m -> MemberName.forName(m.roles().iterator().next()));
+
+        return createDistributedShard(
+                new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names);
+    }
+
     private static void closeProducer(final DOMDataTreeProducer producer) {
         try {
             producer.close();
@@ -246,24 +327,25 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     }
 
     private static class DistributedShardRegistrationImpl implements DistributedShardRegistration {
-        private final ListenerRegistration<ShardFrontend> registration;
+
         private final DOMDataTreeIdentifier prefix;
         private final ActorRef shardedDataTreeActor;
+        private final DistributedShardedDOMDataTree distributedShardedDOMDataTree;
 
-        DistributedShardRegistrationImpl(final ListenerRegistration<ShardFrontend> registration,
-                                         final DOMDataTreeIdentifier prefix,
-                                         final ActorRef shardedDataTreeActor) {
-            this.registration = registration;
+        DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix,
+                                         final ActorRef shardedDataTreeActor,
+                                         final DistributedShardedDOMDataTree distributedShardedDOMDataTree) {
             this.prefix = prefix;
             this.shardedDataTreeActor = shardedDataTreeActor;
+            this.distributedShardedDOMDataTree = distributedShardedDOMDataTree;
         }
 
         @Override
         public void close() {
-            // TODO send the correct messages to ShardManager to destroy the shard
-            // maybe we could provide replica removal mechanisms also?
-            shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
-            registration.close();
+            // first despawn on the local node
+            distributedShardedDOMDataTree.despawnShardFrontend(prefix);
+            // update the config so the remote nodes are updated
+            shardedDataTreeActor.tell(new RemovePrefixShard(prefix), noSender());
         }
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardFrontend.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardFrontend.java
deleted file mode 100644 (file)
index d39ccec..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding;
-
-import com.google.common.base.Preconditions;
-import java.util.Collection;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
-import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
-import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-/**
- * Proxy implementation of a shard that creates forwarding producers to the backend shard.
- */
-class ShardFrontend implements ReadableWriteableDOMDataTreeShard {
-
-    private final DataStoreClient client;
-    private final DOMDataTreeIdentifier shardRoot;
-
-    ShardFrontend(final DataStoreClient client,
-                  final DOMDataTreeIdentifier shardRoot) {
-        this.client = Preconditions.checkNotNull(client);
-        this.shardRoot = Preconditions.checkNotNull(shardRoot);
-    }
-
-    @Override
-    public DOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> paths) {
-        return new ShardProxyProducer(shardRoot, paths, client);
-    }
-
-    @Override
-    public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
-        // TODO message directly into the shard
-    }
-
-    @Override
-    public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
-        // TODO message directly into the shard
-    }
-
-    @Nonnull
-    @Override
-    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
-            final YangInstanceIdentifier treeId, final L listener) {
-        throw new UnsupportedOperationException("Registering data tree change listener is not supported");
-    }
-}
index f977076..3c8db5f 100644 (file)
@@ -11,26 +11,38 @@ package org.opendaylight.controller.cluster.sharding;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
+import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataTreeShard;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Proxy producer implementation that creates transactions that forward all calls to {@link DataStoreClient}.
  */
 class ShardProxyProducer implements DOMDataTreeShardProducer {
 
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class);
+    private static final AtomicLong COUNTER = new AtomicLong();
+
     private final DOMDataTreeIdentifier shardRoot;
     private final Collection<DOMDataTreeIdentifier> prefixes;
-    private final DataStoreClient client;
+    private final ClientLocalHistory history;
+    private DistributedShardModificationFactory modificationFactory;
 
-    ShardProxyProducer(final DOMDataTreeIdentifier shardRoot, final Collection<DOMDataTreeIdentifier> prefixes,
-                       final DataStoreClient client) {
+    ShardProxyProducer(final DOMDataTreeIdentifier shardRoot,
+                       final Collection<DOMDataTreeIdentifier> prefixes,
+                       final DataStoreClient client,
+                       final DistributedShardModificationFactory modificationFactory) {
         this.shardRoot = Preconditions.checkNotNull(shardRoot);
         this.prefixes = ImmutableList.copyOf(Preconditions.checkNotNull(prefixes));
-        this.client = Preconditions.checkNotNull(client);
+        this.modificationFactory = Preconditions.checkNotNull(modificationFactory);
+        history = Preconditions.checkNotNull(client).createLocalHistory();
     }
 
     @Nonnull
@@ -41,7 +53,16 @@ class ShardProxyProducer implements DOMDataTreeShardProducer {
 
     @Override
     public DOMDataTreeShardWriteTransaction createTransaction() {
-        return new ShardProxyTransaction(shardRoot, prefixes, client);
+        return new ShardProxyTransaction(shardRoot, prefixes,
+                modificationFactory.createModification(history.createTransaction()));
+    }
+
+    DistributedShardModificationFactory getModificationFactory() {
+        return modificationFactory;
+    }
+
+    void setModificationFactory(final DistributedShardModificationFactory modificationFactory) {
+        this.modificationFactory = Preconditions.checkNotNull(modificationFactory);
     }
 }
 
index b3c8dfc..d08b47b 100644 (file)
@@ -8,18 +8,27 @@
 
 package org.opendaylight.controller.cluster.sharding;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.store.inmemory.ForeignShardThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,32 +39,43 @@ import org.slf4j.LoggerFactory;
 class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
 
     private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
-    private static final ListenableFuture<Void> NULL_FUTURE = Futures.immediateFuture(null);
-    private static final ListenableFuture<Boolean> VALIDATE_FUTURE = Futures.immediateFuture(true);
 
     private final DOMDataTreeIdentifier shardRoot;
     private final Collection<DOMDataTreeIdentifier> prefixes;
-    private final DataStoreClient client;
-    private final ClientLocalHistory history;
+    private final DistributedShardModification modification;
     private ClientTransaction currentTx;
-    private DOMStoreThreePhaseCommitCohort cohort;
+    private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
 
+    private DOMDataTreeWriteCursor cursor = null;
 
-    ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot, final Collection<DOMDataTreeIdentifier> prefixes,
-                          final DataStoreClient client) {
+    ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
+                          final Collection<DOMDataTreeIdentifier> prefixes,
+                          final DistributedShardModification modification) {
         this.shardRoot = Preconditions.checkNotNull(shardRoot);
         this.prefixes = Preconditions.checkNotNull(prefixes);
-        this.client = Preconditions.checkNotNull(client);
-        history = client.createLocalHistory();
-        currentTx = history.createTransaction();
+        this.modification = Preconditions.checkNotNull(modification);
+    }
+
+    private DOMDataTreeWriteCursor getCursor() {
+        if (cursor == null) {
+            cursor = new DistributedShardModificationCursor(modification, this);
+        }
+        return cursor;
     }
 
     @Nonnull
     @Override
     public DOMDataTreeWriteCursor createCursor(@Nonnull final DOMDataTreeIdentifier prefix) {
         checkAvailable(prefix);
+        final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
+        final DOMDataTreeWriteCursor ret = getCursor();
+        ret.enter(relativePath.getPathArguments());
+        return ret;
+    }
 
-        return currentTx.openCursor();
+    void cursorClosed() {
+        cursor = null;
+        modification.cursorClosed();
     }
 
     private void checkAvailable(final DOMDataTreeIdentifier prefix) {
@@ -68,21 +88,31 @@ class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
                 + "Available prefixes: " + prefixes);
     }
 
+    private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
+        final Optional<YangInstanceIdentifier> relative =
+                path.relativeTo(modification.getPrefix().getRootIdentifier());
+        Preconditions.checkArgument(relative.isPresent());
+        return relative.get();
+    }
+
     @Override
     public void ready() {
         LOG.debug("Readying transaction for shard {}", shardRoot);
 
-        Preconditions.checkState(cohort == null, "Transaction was readied already");
-        cohort = currentTx.ready();
-        currentTx = null;
+        Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
+
+        cohorts.add(modification.seal());
+        for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
+                : modification.getChildShards().entrySet()) {
+            cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
+        }
     }
 
     @Override
     public void close() {
-        if (cohort != null) {
-            cohort.abort();
-            cohort = null;
-        }
+        cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
+        cohorts.clear();
+
         if (currentTx != null) {
             currentTx.abort();
             currentTx = null;
@@ -93,31 +123,86 @@ class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
     public ListenableFuture<Void> submit() {
         LOG.debug("Submitting transaction for shard {}", shardRoot);
 
-        Preconditions.checkNotNull(cohort, "Transaction not readied yet");
-        return NULL_FUTURE;
+        Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+
+        final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
+        final AsyncFunction<Void, Void> prepareFunction = input -> commit();
+
+        // transform validate into prepare
+        final ListenableFuture<Void> prepareFuture = Futures.transform(validate(), validateFunction);
+        // transform prepare into commit and return as submit result
+        return Futures.transform(prepareFuture, prepareFunction);
     }
 
     @Override
     public ListenableFuture<Boolean> validate() {
         LOG.debug("Validating transaction for shard {}", shardRoot);
 
-        Preconditions.checkNotNull(cohort, "Transaction not readied yet");
-        return VALIDATE_FUTURE;
+        Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+        final List<ListenableFuture<Boolean>> futures =
+                cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
+        final SettableFuture<Boolean> ret = SettableFuture.create();
+
+        Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
+            @Override
+            public void onSuccess(final List<Boolean> result) {
+                ret.set(true);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                ret.setException(throwable);
+            }
+        });
+
+        return ret;
     }
 
     @Override
     public ListenableFuture<Void> prepare() {
         LOG.debug("Preparing transaction for shard {}", shardRoot);
 
-        Preconditions.checkNotNull(cohort, "Transaction not readied yet");
-        return NULL_FUTURE;
+        Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+        final List<ListenableFuture<Void>> futures =
+                cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
+        final SettableFuture<Void> ret = SettableFuture.create();
+
+        Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
+            @Override
+            public void onSuccess(final List<Void> result) {
+                ret.set(null);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                ret.setException(throwable);
+            }
+        });
+
+        return ret;
     }
 
     @Override
     public ListenableFuture<Void> commit() {
         LOG.debug("Committing transaction for shard {}", shardRoot);
 
-        Preconditions.checkNotNull(cohort, "Transaction not readied yet");
-        return NULL_FUTURE;
+        Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+        final List<ListenableFuture<Void>> futures =
+                cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
+        final SettableFuture<Void> ret = SettableFuture.create();
+
+        Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
+            @Override
+            public void onSuccess(final List<Void> result) {
+                ret.set(null);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                ret.setException(throwable);
+            }
+        });
+
+        return ret;
     }
 }
index c1a099b..3c1ae10 100644 (file)
@@ -16,6 +16,7 @@ import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Status;
+import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.cluster.ClusterEvent.MemberExited;
 import akka.cluster.ClusterEvent.MemberRemoved;
@@ -24,22 +25,27 @@ import akka.cluster.ClusterEvent.MemberWeaklyUp;
 import akka.cluster.ClusterEvent.ReachableMember;
 import akka.cluster.ClusterEvent.UnreachableMember;
 import akka.cluster.Member;
+import akka.cluster.ddata.DistributedData;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.Replicator;
+import akka.cluster.ddata.Replicator.Changed;
+import akka.cluster.ddata.Replicator.Subscribe;
+import akka.cluster.ddata.Replicator.Update;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
@@ -53,13 +59,9 @@ import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import scala.compat.java8.FutureConverters;
@@ -73,10 +75,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     private static final String PERSISTENCE_ID = "sharding-service-actor";
     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
 
-    private final DOMDataTreeService dataTreeService;
-    private final DOMDataTreeShardingService shardingService;
+    private final DistributedShardedDOMDataTree shardingService;
     private final ActorSystem actorSystem;
-    private final ClusterWrapper cluster;
+    private final ClusterWrapper clusterWrapper;
     // helper actorContext used only for static calls to executeAsync etc
     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
     private final ActorContext actorContext;
@@ -87,20 +88,35 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
     private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
 
+    private final Cluster cluster;
+    private final ActorRef replicator;
+
+    private ORMap<PrefixShardConfiguration> currentData = ORMap.create();
+    private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
+
     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
         LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
 
-        dataTreeService = builder.getDataTreeService();
         shardingService = builder.getShardingService();
         actorSystem = builder.getActorSystem();
-        cluster = builder.getClusterWrapper();
+        clusterWrapper = builder.getClusterWrapper();
         distributedConfigDatastore = builder.getDistributedConfigDatastore();
         distributedOperDatastore = builder.getDistributedOperDatastore();
         actorContext = distributedConfigDatastore.getActorContext();
         resolver = new ShardingServiceAddressResolver(
-                DistributedShardedDOMDataTree.ACTOR_ID, cluster.getCurrentMemberName());
+                DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
+
+        clusterWrapper.subscribeToMemberEvents(self());
+        cluster = Cluster.get(actorSystem);
+
+        replicator = DistributedData.get(context().system()).replicator();
+    }
 
-        cluster.subscribeToMemberEvents(self());
+    @Override
+    public void preStart() {
+        final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
+                new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
+        replicator.tell(subscribe, noSender());
     }
 
     @Override
@@ -110,6 +126,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
 
     @Override
     protected void handleCommand(final Object message) throws Exception {
+        LOG.debug("Received {}", message);
         if (message instanceof ClusterEvent.MemberUp) {
             memberUp((ClusterEvent.MemberUp) message);
         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
@@ -122,6 +139,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
             memberUnreachable((ClusterEvent.UnreachableMember) message);
         } else if (message instanceof ClusterEvent.ReachableMember) {
             memberReachable((ClusterEvent.ReachableMember) message);
+        } else if (message instanceof Changed) {
+            onConfigChanged((Changed) message);
         } else if (message instanceof ProducerCreated) {
             onProducerCreated((ProducerCreated) message);
         } else if (message instanceof NotifyProducerCreated) {
@@ -141,6 +160,42 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
+        LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change);
+
+        currentData = change.dataValue();
+        final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
+
+        LOG.debug("Changed set {}", changedConfig);
+
+        try {
+            final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
+                    changedConfig.values().stream().collect(
+                            Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity()));
+            resolveConfig(newConfig);
+        } catch (final IllegalStateException e) {
+            LOG.error("Failed, ", e);
+        }
+
+    }
+
+    private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
+
+        // get the removed configurations
+        final SetView<DOMDataTreeIdentifier> deleted =
+                Sets.difference(currentConfiguration.keySet(), newConfig.keySet());
+        shardingService.resolveShardRemovals(deleted);
+
+        // get the added configurations
+        final SetView<DOMDataTreeIdentifier> additions =
+                Sets.difference(newConfig.keySet(), currentConfiguration.keySet());
+        shardingService.resolveShardAdditions(additions);
+        // we can ignore those that existed previously since the potential changes in replicas will be handled by
+        // shard manager.
+
+        currentConfiguration = new HashMap<>(newConfig);
+    }
+
     @Override
     public String persistenceId() {
         return PERSISTENCE_ID;
@@ -198,6 +253,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
 
     private void onProducerCreated(final ProducerCreated message) {
         LOG.debug("Received ProducerCreated: {}", message);
+
+        // fastpath if no replication is needed, since there is only one node
+        if (resolver.getShardingServicePeerActorAddresses().size() == 1) {
+            getSender().tell(new Status.Success(null), noSender());
+        }
+
         final ActorRef sender = getSender();
         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
 
@@ -216,18 +277,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
                 futures.toArray(new CompletableFuture[futures.size()]));
 
         combinedFuture.thenRun(() -> {
-            for (final CompletableFuture<Object> future : futures) {
-                try {
-                    final Object result = future.get();
-                    if (result instanceof Status.Failure) {
-                        sender.tell(result, self());
-                        return;
-                    }
-                } catch (InterruptedException | ExecutionException e) {
-                    sender.tell(new Status.Failure(e), self());
-                    return;
-                }
-            }
             sender.tell(new Status.Success(null), noSender());
         }).exceptionally(throwable -> {
             sender.tell(new Status.Failure(throwable), self());
@@ -242,7 +291,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
 
         try {
             final ActorProducerRegistration registration =
-                    new ActorProducerRegistration(dataTreeService.createProducer(subtrees), subtrees);
+                    new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
             subtrees.forEach(id -> idToProducer.put(id, registration));
             sender().tell(new Status.Success(null), self());
         } catch (final IllegalArgumentException e) {
@@ -298,59 +347,19 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void onCreatePrefixShard(final CreatePrefixShard message) {
-        LOG.debug("Received CreatePrefixShard: {}", message);
+        LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
 
         final PrefixShardConfiguration configuration = message.getConfiguration();
 
-        final DOMDataTreeProducer producer =
-                dataTreeService.createProducer(Collections.singleton(configuration.getPrefix()));
-
-        final DistributedDataStore distributedDataStore =
-                configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION
-                        ? distributedConfigDatastore : distributedOperDatastore;
-        final String shardName = ClusterUtils.getCleanShardName(configuration.getPrefix().getRootIdentifier());
-        LOG.debug("Creating distributed datastore client for shard {}", shardName);
-        final Props distributedDataStoreClientProps =
-                SimpleDataStoreClientActor.props(cluster.getCurrentMemberName(),
-                        "Shard-" + shardName, distributedDataStore.getActorContext(), shardName);
-
-        final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
-        final DataStoreClient client;
-        try {
-            client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
-        } catch (final Exception e) {
-            LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
-            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            throw Throwables.propagate(e);
-        }
-
-        try {
-            final ListenerRegistration<ShardFrontend> shardFrontendRegistration =
-                    shardingService.registerDataTreeShard(configuration.getPrefix(),
-                            new ShardFrontend(
-                                    client,
-                                    configuration.getPrefix()
-                            ),
-                            producer);
-            idToShardRegistration.put(configuration.getPrefix(),
-                    new ShardFrontendRegistration(clientActor, shardFrontendRegistration));
+        final Update<ORMap<PrefixShardConfiguration>> update =
+                new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
+                    map -> map.put(cluster, configuration.toDataMapKey(), configuration));
 
-            sender().tell(new Status.Success(null), self());
-        } catch (final DOMDataTreeShardingConflictException e) {
-            LOG.error("Unable to create shard", e);
-            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            sender().tell(new Status.Failure(e), self());
-        } finally {
-            try {
-                producer.close();
-            } catch (final DOMDataTreeProducerException e) {
-                LOG.error("Unable to close producer that was used for shard registration {}", producer, e);
-            }
-        }
+        replicator.tell(update, self());
     }
 
     private void onPrefixShardCreated(final PrefixShardCreated message) {
-        LOG.debug("Received PrefixShardCreated: {}", message);
+        LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
 
         final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
         final ActorRef sender = getSender();
@@ -367,18 +376,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
                 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
 
         combinedFuture.thenRun(() -> {
-            for (final CompletableFuture<Object> future : futures) {
-                try {
-                    final Object result = future.get();
-                    if (result instanceof Status.Failure) {
-                        sender.tell(result, self());
-                        return;
-                    }
-                } catch (InterruptedException | ExecutionException e) {
-                    sender.tell(new Status.Failure(e), self());
-                    return;
-                }
-            }
             sender.tell(new Status.Success(null), self());
         }).exceptionally(throwable -> {
             sender.tell(new Status.Failure(throwable), self());
@@ -387,12 +384,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     }
 
     private void onRemovePrefixShard(final RemovePrefixShard message) {
-        LOG.debug("Received RemovePrefixShard: {}", message);
+        LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
 
-        for (final String address : resolver.getShardingServicePeerActorAddresses()) {
-            final ActorSelection selection = actorContext.actorSelection(address);
-            selection.tell(new PrefixShardRemoved(message.getPrefix()), getSelf());
-        }
+        //TODO the removal message should have the configuration or some other way to get to the key
+        final Update<ORMap<PrefixShardConfiguration>> removal =
+                new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
+                    map -> map.remove(cluster, "prefix=" + message.getPrefix()));
+        replicator.tell(removal, self());
     }
 
     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
@@ -431,13 +429,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     }
 
     private static class ShardFrontendRegistration extends
-            AbstractObjectRegistration<ListenerRegistration<ShardFrontend>> {
+            AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
 
         private final ActorRef clientActor;
-        private final ListenerRegistration<ShardFrontend> shardRegistration;
+        private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
 
         ShardFrontendRegistration(final ActorRef clientActor,
-                                         final ListenerRegistration<ShardFrontend> shardRegistration) {
+                                  final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
             super(shardRegistration);
             this.clientActor = clientActor;
             this.shardRegistration = shardRegistration;
@@ -452,27 +450,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
 
     public static class ShardedDataTreeActorCreator {
 
-        private DOMDataTreeService dataTreeService;
-        private DOMDataTreeShardingService shardingService;
+        private DistributedShardedDOMDataTree shardingService;
         private DistributedDataStore distributedConfigDatastore;
         private DistributedDataStore distributedOperDatastore;
         private ActorSystem actorSystem;
         private ClusterWrapper cluster;
 
-        public DOMDataTreeService getDataTreeService() {
-            return dataTreeService;
-        }
-
-        public ShardedDataTreeActorCreator setDataTreeService(final DOMDataTreeService dataTreeService) {
-            this.dataTreeService = dataTreeService;
-            return this;
-        }
-
-        public DOMDataTreeShardingService getShardingService() {
+        public DistributedShardedDOMDataTree getShardingService() {
             return shardingService;
         }
 
-        public ShardedDataTreeActorCreator setShardingService(final DOMDataTreeShardingService shardingService) {
+        public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
             this.shardingService = shardingService;
             return this;
         }
@@ -516,7 +504,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         }
 
         private void verify() {
-            Preconditions.checkNotNull(dataTreeService);
             Preconditions.checkNotNull(shardingService);
             Preconditions.checkNotNull(actorSystem);
             Preconditions.checkNotNull(cluster);
index 20c0ea9..b9bf791 100644 (file)
@@ -32,4 +32,13 @@ public class CreatePrefixShard implements Serializable {
     public PrefixShardConfiguration getConfiguration() {
         return configuration;
     }
+
+
+    @Override
+    public String toString() {
+        return "CreatePrefixShard{"
+                + "configuration="
+                + configuration
+                + '}';
+    }
 }
index f7113ab..d468992 100644 (file)
@@ -31,4 +31,11 @@ public class PrefixShardCreated {
     public PrefixShardConfiguration getConfiguration() {
         return configuration;
     }
+
+    @Override
+    public String toString() {
+        return "PrefixShardCreated{"
+                + "configuration=" + configuration
+                + '}';
+    }
 }
index 5f84071..1890b64 100644 (file)
@@ -14,8 +14,9 @@ import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 
 /**
- * Message sent to remote {@link ShardedDataTreeActor}'s when there is an
- * attempt to remove a shard from the sharding service.
+ * Message sent to remote {@link ShardedDataTreeActor}'s when there is an attempt to remove the shard,
+ * the ShardedDataTreeActor should remove the shard from the current configuration so that the change is picked up
+ * in the backend ShardManager.
  */
 @Beta
 public class PrefixShardRemoved implements Serializable {
index fa74af6..6de1bb0 100644 (file)
@@ -13,8 +13,9 @@ import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 
 /**
- * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node. The local actor
- * should then notify the remote nodes of the Removal with {@link PrefixShardRemoved} message.
+ * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node.
+ * The local actor should update the configuration so that the change is picked up by other CDS Node Agents and
+ * backend ShardManagers.
  */
 public class RemovePrefixShard {
 
@@ -28,4 +29,11 @@ public class RemovePrefixShard {
     public DOMDataTreeIdentifier getPrefix() {
         return prefix;
     }
+
+    @Override
+    public String toString() {
+        return "RemovePrefixShard{"
+                + "prefix=" + prefix
+                + '}';
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractClusterRefActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractClusterRefActorTest.java
new file mode 100644 (file)
index 0000000..f47ff18
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public abstract class AbstractClusterRefActorTest extends AbstractTest {
+    private static ActorSystem system;
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        System.setProperty("shard.persistent", "false");
+        system = ActorSystem.create("test", ConfigFactory.load().getConfig("test"));
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws IOException {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    protected static ActorSystem getSystem() {
+        return system;
+    }
+}
index 15e6b03..80cc2e7 100644 (file)
@@ -32,7 +32,7 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
-public class AbstractShardManagerTest extends AbstractActorTest {
+public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
 
     protected static final MemberName MEMBER_1 = MemberName.forName("member-1");
 
index ade3022..1b9ea16 100644 (file)
@@ -76,6 +76,7 @@ import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -97,8 +98,28 @@ public abstract class AbstractTransactionProxyTest extends AbstractTest {
 
     private final Configuration configuration = new MockConfiguration() {
         Map<String, ShardStrategy> strategyMap = ImmutableMap.<String, ShardStrategy>builder().put(
-                "junk", path -> "junk").put(
-                "cars", path -> "cars").build();
+                "junk", new ShardStrategy() {
+                    @Override
+                    public String findShard(YangInstanceIdentifier path) {
+                        return "junk";
+                    }
+
+                    @Override
+                    public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+                        return YangInstanceIdentifier.EMPTY;
+                    }
+                }).put(
+                "cars", new ShardStrategy() {
+                    @Override
+                    public String findShard(YangInstanceIdentifier path) {
+                        return "cars";
+                    }
+
+                    @Override
+                    public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+                        return YangInstanceIdentifier.EMPTY;
+                    }
+                }).build();
 
         @Override
         public ShardStrategy getStrategyForModule(String moduleName) {
@@ -157,7 +178,8 @@ public abstract class AbstractTransactionProxyTest extends AbstractTest {
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
-        doReturn(new ShardStrategyFactory(configuration)).when(mockActorContext).getShardStrategyFactory();
+        doReturn(new ShardStrategyFactory(configuration,
+                LogicalDatastoreType.CONFIGURATION)).when(mockActorContext).getShardStrategyFactory();
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
         doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
index 67af9bb..96c45f3 100644 (file)
@@ -126,7 +126,7 @@ public class DatastoreSnapshotRestoreTest {
     }
 
     private static ShardManagerSnapshot newShardManagerSnapshot(String... shards) {
-        return new ShardManagerSnapshot(Arrays.asList(shards));
+        return new ShardManagerSnapshot(Arrays.asList(shards), Collections.emptyMap());
     }
 
     private static Snapshot newSnapshot(YangInstanceIdentifier path, NormalizedNode<?, ?> node)
index 3560260..28c4fa3 100644 (file)
@@ -41,12 +41,16 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
 public class IntegrationTestKit extends ShardTestKit {
 
+    private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestKit.class);
+
     protected DatastoreContext.Builder datastoreContextBuilder;
     protected DatastoreSnapshot restoreFromSnapshot;
 
@@ -171,6 +175,19 @@ public class IntegrationTestKit extends ShardTestKit {
         return shard;
     }
 
+    public static void waitUntilShardIsDown(ActorContext actorContext, String shardName) {
+        for (int i = 0; i < 20 * 5 ; i++) {
+            LOG.debug("Waiting for shard down {}", shardName);
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
+            if (!shardReply.isPresent()) {
+                return;
+            }
+        }
+
+        throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time");
+    }
+
     public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
             final ShardStatsVerifier verifier) throws Exception {
         ActorContext actorContext = datastore.getActorContext();
index c87b2f4..289c6b7 100644 (file)
@@ -9,33 +9,20 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.AddressFromURIString;
 import akka.actor.Status.Success;
-import akka.cluster.Cluster;
-import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import com.google.common.collect.Lists;
-import com.typesafe.config.ConfigFactory;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.Shard.Builder;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -55,8 +42,6 @@ public class PrefixShardCreationTest extends AbstractShardManagerTest {
     private static final DOMDataTreeIdentifier TEST_ID =
             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
 
-    private static final MemberName MEMBER_2 = MemberName.forName("member-2");
-
     @Test
     public void testPrefixShardCreation() throws Exception {
 
@@ -92,98 +77,4 @@ public class PrefixShardCreationTest extends AbstractShardManagerTest {
             }
         };
     }
-
-    @Test
-    public void testPrefixShardReplicas() throws Exception {
-        LOG.info("testPrefixShardReplicas starting");
-        final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
-
-        // Create ACtorSystem for member-1
-        final ActorSystem system1 = newActorSystem("Member1");
-        Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
-
-        final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
-                newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
-                        .waitTillReadyCountDownLatch(ready)
-                        .cluster(new ClusterWrapperImpl(system1))
-                        .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                shardManagerID);
-
-        // Create an ActorSystem ShardManager actor for member-2.
-
-        final ActorSystem system2 = newActorSystem("Member2");
-
-        Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
-
-        final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
-                newTestShardMgrBuilder()
-                        .configuration(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
-                        .waitTillReadyCountDownLatch(ready)
-                        .cluster(new ClusterWrapperImpl(system2)).props().withDispatcher(
-                        Dispatchers.DefaultDispatcherId()),
-                shardManagerID);
-
-        final JavaTestKit kit2 = new JavaTestKit(system2);
-
-        new JavaTestKit(system1) {
-            {
-                shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-                shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-
-                // check shard does not exist
-                shardManager1.tell(new FindLocalShard(
-                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
-                shardManager2.tell(new FindLocalShard(
-                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
-                kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
-                // create shard on node1
-                final Builder builder = Shard.builder();
-
-                final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
-                        new PrefixShardConfiguration(TEST_ID,
-                                PrefixShardStrategy.NAME,
-                                Lists.newArrayList(MEMBER_1, MEMBER_2)),
-                        datastoreContextBuilder.build(), builder);
-
-                shardManager1.tell(createPrefixedShard, getRef());
-                expectMsgClass(duration("5 seconds"), Success.class);
-
-                shardManager1.underlyingActor().waitForMemberUp();
-
-                LOG.info("changed leader state");
-
-                // check node2 cannot find it locally
-                shardManager1.tell(new FindLocalShard(
-                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
-                shardManager2.tell(new FindLocalShard(
-                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
-                kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
-                // but can remotely
-                shardManager2.tell(new FindPrimary(
-                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
-                kit2.expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
-
-                // add replica and verify if succesful
-                shardManager2.tell(new AddPrefixShardReplica(TEST_ID.getRootIdentifier()), kit2.getRef());
-                kit2.expectMsgClass(duration("5 seconds"), Success.class);
-
-                // verify we have a replica on manager2 now
-                shardManager2.tell(new FindLocalShard(
-                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
-                kit2.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-            }
-        };
-    }
-
-    private ActorSystem newActorSystem(final String config) {
-        final ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
-        actorSystems.add(system);
-        return system;
-    }
-}
+}
\ No newline at end of file
index 6cfde54..d484f99 100644 (file)
@@ -622,7 +622,6 @@ public class ShardTest extends AbstractShardTest {
                 final ReadyTransactionReply readyReply = ReadyTransactionReply
                         .fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class));
                 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
-
                 // Send the CanCommitTransaction message for the first Tx.
 
                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractClusterRefEntityOwnershipTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractClusterRefEntityOwnershipTest.java
new file mode 100644 (file)
index 0000000..1354135
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class AbstractClusterRefEntityOwnershipTest extends AbstractEntityOwnershipTest {
+
+    private static ActorSystem system;
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        system = ActorSystem.create("test", ConfigFactory.load().getConfig("test"));
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws IOException {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    protected static ActorSystem getSystem() {
+        return system;
+    }
+}
index aac4b23..83056ba 100644 (file)
@@ -74,7 +74,7 @@ import scala.concurrent.duration.Duration;
  *
  * @author Thomas Pantelis
  */
-public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnershipTest {
+public class DistributedEntityOwnershipServiceTest extends AbstractClusterRefEntityOwnershipTest {
     static final String ENTITY_TYPE = "test";
     static final String ENTITY_TYPE2 = "test2";
     static final QName QNAME = QName.create("test", "2015-08-11", "foo");
index 8c8e329..f344815 100644 (file)
@@ -76,7 +76,7 @@ public class DatastoreSnapshotListTest {
         assertEquals("DatastoreSnapshotList size", 2, cloned.size());
         assertDatastoreSnapshotEquals(legacyConfigSnapshot, cloned.get(0),
                 new org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot(
-                        legacyShardManagerSnapshot.getShardList()),
+                        legacyShardManagerSnapshot.getShardList(), Collections.emptyMap()),
                 Optional.of(legacyConfigRoot1), Optional.of(legacyConfigRoot2));
         assertDatastoreSnapshotEquals(legacyOperSnapshot, cloned.get(1),
                 (org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot)null,
index 7e62963..26d08e6 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.persisted;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
+import java.util.Collections;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
 
@@ -22,7 +23,8 @@ public class ShardManagerSnapshotTest {
 
     @Test
     public void testSerialization() {
-        ShardManagerSnapshot expected = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2"));
+        ShardManagerSnapshot expected =
+                new ShardManagerSnapshot(Arrays.asList("shard1", "shard2"), Collections.emptyMap());
         ShardManagerSnapshot cloned = (ShardManagerSnapshot) SerializationUtils.clone(expected);
 
         assertEquals("getShardList", expected.getShardList(), cloned.getShardList());
index 76af089..c503184 100644 (file)
@@ -45,7 +45,7 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest {
         JavaTestKit kit = new JavaTestKit(getSystem());
 
         List<String> shardList = Arrays.asList("shard1", "shard2", "shard3");
-        ShardManagerSnapshot shardManagerSnapshot = new ShardManagerSnapshot(shardList);
+        ShardManagerSnapshot shardManagerSnapshot = new ShardManagerSnapshot(shardList, Collections.emptyMap());
         ActorRef replyActor = getSystem().actorOf(ShardManagerGetSnapshotReplyActor.props(
                 shardList, "config", shardManagerSnapshot, kit.getRef(),
                 "shard-manager", Duration.create(100, TimeUnit.SECONDS)), "testSuccess");
index ffa2641..c6cc641 100644 (file)
@@ -1392,7 +1392,8 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
                 .put("astronauts", Collections.<String>emptyList()).build());
 
-        ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
+        ShardManagerSnapshot snapshot =
+                new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
                 Collections.<ShardSnapshot>emptyList());
         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
@@ -1492,7 +1493,8 @@ public class ShardManagerTest extends AbstractShardManagerTest {
                 // Have a dummy snapshot to be overwritten by the new data
                 // persisted.
                 String[] restoredShards = { "default", "people" };
-                ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+                ShardManagerSnapshot snapshot =
+                        new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
                 InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
                 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
 
@@ -1947,12 +1949,14 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         LOG.info("testShardPersistenceWithRestoredData starting");
         new JavaTestKit(getSystem()) {
             {
-                MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
-                        .put("default", Arrays.asList("member-1", "member-2"))
-                        .put("astronauts", Arrays.asList("member-2"))
-                        .put("people", Arrays.asList("member-1", "member-2")).build());
-                String[] restoredShards = { "default", "astronauts" };
-                ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+                MockConfiguration mockConfig =
+                    new MockConfiguration(ImmutableMap.<String, List<String>>builder()
+                            .put("default", Arrays.asList("member-1", "member-2"))
+                            .put("astronauts", Arrays.asList("member-2"))
+                            .put("people", Arrays.asList("member-1", "member-2")).build());
+                String[] restoredShards = {"default", "astronauts"};
+                ShardManagerSnapshot snapshot =
+                        new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
                 InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
 
                 // create shardManager to come up with restored data
index 76cb733..e7b70e8 100644 (file)
@@ -18,6 +18,7 @@ import org.junit.rules.ExpectedException;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class ShardStrategyFactoryTest {
@@ -29,7 +30,8 @@ public class ShardStrategyFactoryTest {
 
     @Before
     public void setUp() {
-        factory = new ShardStrategyFactory(new ConfigurationImpl("module-shards.conf", "modules.conf"));
+        factory = new ShardStrategyFactory(
+                new ConfigurationImpl("module-shards.conf", "modules.conf"), LogicalDatastoreType.CONFIGURATION);
     }
 
     @Test
index 3a4c383..ec64075 100644 (file)
@@ -19,7 +19,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.cluster.datastore.config.ModuleConfig;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 
 public class MockConfiguration extends ConfigurationImpl {
     public MockConfiguration() {
@@ -41,7 +41,7 @@ public class MockConfiguration extends ConfigurationImpl {
     }
 
     @Override
-    public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+    public ShardStrategy getStrategyForPrefix(@Nonnull final DOMDataTreeIdentifier prefix) {
         return null;
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java
new file mode 100644 (file)
index 0000000..febc929
--- /dev/null
@@ -0,0 +1,212 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+public class DistributedShardFrontendTest {
+
+    private static final DOMDataTreeIdentifier ROOT =
+            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+    private static final ListenableFuture<Object> SUCCESS_FUTURE = Futures.immediateFuture(null);
+
+    private ShardedDOMDataTree shardedDOMDataTree;
+
+    private DataStoreClient client;
+    private ClientLocalHistory clientHistory;
+    private ClientTransaction clientTransaction;
+    private DOMDataTreeWriteCursor cursor;
+
+    private static final YangInstanceIdentifier OUTER_LIST_YID = TestModel.OUTER_LIST_PATH.node(
+            new NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+    private static final DOMDataTreeIdentifier OUTER_LIST_ID =
+            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, OUTER_LIST_YID);
+
+    @Captor
+    private ArgumentCaptor<YangInstanceIdentifier.PathArgument> pathArgumentCaptor;
+    @Captor
+    private ArgumentCaptor<NormalizedNode<?, ?>> nodeCaptor;
+
+    private DOMStoreThreePhaseCommitCohort commitCohort;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        shardedDOMDataTree = new ShardedDOMDataTree();
+        client = mock(DataStoreClient.class);
+        cursor = mock(DOMDataTreeWriteCursor.class);
+        clientTransaction = mock(ClientTransaction.class);
+        clientHistory = mock(ClientLocalHistory.class);
+        commitCohort = mock(DOMStoreThreePhaseCommitCohort.class);
+
+        doReturn(SUCCESS_FUTURE).when(commitCohort).canCommit();
+        doReturn(SUCCESS_FUTURE).when(commitCohort).preCommit();
+        doReturn(SUCCESS_FUTURE).when(commitCohort).commit();
+        doReturn(SUCCESS_FUTURE).when(commitCohort).abort();
+
+        doReturn(clientTransaction).when(client).createTransaction();
+        doReturn(clientTransaction).when(clientHistory).createTransaction();
+        doNothing().when(clientHistory).close();
+
+        doNothing().when(client).close();
+        doReturn(clientHistory).when(client).createLocalHistory();
+
+        doReturn(cursor).when(clientTransaction).openCursor();
+        doNothing().when(cursor).close();
+        doNothing().when(cursor).write(any(), any());
+        doNothing().when(cursor).merge(any(), any());
+        doNothing().when(cursor).delete(any());
+
+        doReturn(commitCohort).when(clientTransaction).ready();
+    }
+
+    @Test
+    public void testClientTransaction() throws Exception {
+
+        final DistributedDataStore distributedDataStore = mock(DistributedDataStore.class);
+        final DistributedShardFrontend rootShard = new DistributedShardFrontend(distributedDataStore, client, ROOT);
+
+        try (final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT))) {
+            shardedDOMDataTree.registerDataTreeShard(ROOT, rootShard, producer);
+        }
+
+        final DataStoreClient outerListClient = mock(DataStoreClient.class);
+        final ClientTransaction outerListClientTransaction = mock(ClientTransaction.class);
+        final ClientLocalHistory outerListClientHistory = mock(ClientLocalHistory.class);
+        final DOMDataTreeWriteCursor outerListCursor = mock(DOMDataTreeWriteCursor.class);
+
+        doNothing().when(outerListCursor).close();
+        doNothing().when(outerListCursor).write(any(), any());
+        doNothing().when(outerListCursor).merge(any(), any());
+        doNothing().when(outerListCursor).delete(any());
+
+        doReturn(outerListCursor).when(outerListClientTransaction).openCursor();
+        doReturn(outerListClientTransaction).when(outerListClient).createTransaction();
+        doReturn(outerListClientHistory).when(outerListClient).createLocalHistory();
+        doReturn(outerListClientTransaction).when(outerListClientHistory).createTransaction();
+
+        doReturn(commitCohort).when(outerListClientTransaction).ready();
+
+        doNothing().when(outerListClientHistory).close();
+        doNothing().when(outerListClient).close();
+
+        final DistributedShardFrontend outerListShard = new DistributedShardFrontend(
+                distributedDataStore, outerListClient, OUTER_LIST_ID);
+        try (final DOMDataTreeProducer producer =
+                     shardedDOMDataTree.createProducer(Collections.singletonList(OUTER_LIST_ID))) {
+            shardedDOMDataTree.registerDataTreeShard(OUTER_LIST_ID, outerListShard, producer);
+        }
+
+        final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT));
+        final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false);
+        final DOMDataTreeWriteCursor cursor = tx.createCursor(ROOT);
+
+        assertNotNull(cursor);
+        cursor.write(TestModel.TEST_PATH.getLastPathArgument(), createCrossShardContainer());
+
+        //check the lower shard got the correct modification
+        verify(outerListCursor, times(2)).write(pathArgumentCaptor.capture(), nodeCaptor.capture());
+
+        final YangInstanceIdentifier.PathArgument expectedYid = new NodeIdentifier(TestModel.ID_QNAME);
+        final YangInstanceIdentifier.PathArgument actualIdYid = pathArgumentCaptor.getAllValues().get(0);
+        assertEquals(expectedYid, actualIdYid);
+
+        final YangInstanceIdentifier.PathArgument expectedInnerYid = new NodeIdentifier(TestModel.INNER_LIST_QNAME);
+        final YangInstanceIdentifier.PathArgument actualInnerListYid = pathArgumentCaptor.getAllValues().get(1);
+        assertEquals(expectedInnerYid, actualInnerListYid);
+
+        final LeafNode<Integer> actualIdNode = (LeafNode<Integer>) nodeCaptor.getAllValues().get(0);
+        assertEquals(ImmutableNodes.leafNode(TestModel.ID_QNAME, 1), actualIdNode);
+
+        final MapNode actualInnerListNode = (MapNode) nodeCaptor.getAllValues().get(1);
+        assertEquals(createInnerMapNode(1), actualInnerListNode);
+
+        cursor.close();
+        tx.submit().checkedGet();
+
+        verify(commitCohort, times(2)).canCommit();
+        verify(commitCohort, times(2)).preCommit();
+        verify(commitCohort, times(2)).commit();
+
+    }
+
+    private static MapNode createInnerMapNode(final int id) {
+        final MapEntryNode listEntry = ImmutableNodes
+                .mapEntryBuilder(TestModel.INNER_LIST_QNAME, TestModel.NAME_QNAME, "name-" + id)
+                .withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "name-" + id))
+                .withChild(ImmutableNodes.leafNode(TestModel.VALUE_QNAME, "value-" + id))
+                .build();
+
+        return ImmutableNodes.mapNodeBuilder(TestModel.INNER_LIST_QNAME).withChild(listEntry).build();
+    }
+
+    private static ContainerNode createCrossShardContainer() {
+
+        final MapEntryNode outerListEntry1 =
+                ImmutableNodes.mapEntryBuilder(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)
+                        .withChild(createInnerMapNode(1))
+                        .build();
+        final MapEntryNode outerListEntry2 =
+                ImmutableNodes.mapEntryBuilder(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)
+                        .withChild(createInnerMapNode(2))
+                        .build();
+
+        final MapNode outerList = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+                .withChild(outerListEntry1)
+                .withChild(outerListEntry2)
+                .build();
+
+        final ContainerNode testContainer = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
+                .withChild(outerList)
+                .build();
+
+        return testContainer;
+    }
+
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java
new file mode 100644 (file)
index 0000000..eb109f1
--- /dev/null
@@ -0,0 +1,369 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
+import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.actor.PoisonPill;
+import akka.cluster.Cluster;
+import akka.cluster.ddata.DistributedData;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Ignore("Needs to have the configuration backend switched from distributed-data")
+public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
+
+    private static final Address MEMBER_1_ADDRESS =
+            AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
+
+    private static final DOMDataTreeIdentifier TEST_ID =
+            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+    private ActorSystem leaderSystem;
+    private ActorSystem followerSystem;
+
+
+    private final Builder leaderDatastoreContextBuilder =
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+
+    private final DatastoreContext.Builder followerDatastoreContextBuilder =
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
+                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+    private DistributedDataStore followerDistributedDataStore;
+    private DistributedDataStore leaderDistributedDataStore;
+    private IntegrationTestKit followerTestKit;
+    private IntegrationTestKit leaderTestKit;
+
+    private DistributedShardedDOMDataTree leaderShardFactory;
+    private DistributedShardedDOMDataTree followerShardFactory;
+
+    @Before
+    public void setUp() {
+
+        leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
+
+        followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+        Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
+
+    }
+
+    @After
+    public void tearDown() {
+        if (followerDistributedDataStore != null) {
+            followerDistributedDataStore.close();
+        }
+        if (leaderDistributedDataStore != null) {
+            leaderDistributedDataStore.close();
+        }
+
+        DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
+        DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+        JavaTestKit.shutdownActorSystem(leaderSystem);
+        JavaTestKit.shutdownActorSystem(followerSystem);
+    }
+
+    private void initEmptyDatastores(final String type) {
+        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+
+        leaderDistributedDataStore =
+                leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+
+        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+        followerDistributedDataStore =
+                followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+
+        leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
+                leaderDistributedDataStore,
+                leaderDistributedDataStore);
+
+        followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
+                followerDistributedDataStore,
+                followerDistributedDataStore);
+
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
+    }
+
+    @Test
+    @Ignore("Needs different shard creation handling due to replicas")
+    public void testProducerRegistrations() throws Exception {
+        initEmptyDatastores("config");
+
+        leaderTestKit.waitForMembersUp("member-2");
+
+        final DistributedShardRegistration shardRegistration =
+                leaderShardFactory.createDistributedShard(
+                        TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+        final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+
+        assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
+
+        assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
+
+        final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+        try {
+            followerShardFactory.createProducer(Collections.singleton(TEST_ID));
+            fail("Producer should be already registered on the other node");
+        } catch (final IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("is attached to producer"));
+        }
+
+        producer.close();
+
+        final DOMDataTreeProducer followerProducer =
+                followerShardFactory.createProducer(Collections.singleton(TEST_ID));
+        try {
+            leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+            fail("Producer should be already registered on the other node");
+        } catch (final IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("is attached to producer"));
+        }
+
+        followerProducer.close();
+        // try to create a shard on an already registered prefix on follower
+        try {
+            followerShardFactory.createDistributedShard(TEST_ID,
+                    Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+            fail("This prefix already should have a shard registration that was forwarded from the other node");
+        } catch (final DOMDataTreeShardingConflictException e) {
+            assertTrue(e.getMessage().contains("is already occupied by shard"));
+        }
+    }
+
+    @Test
+    @Ignore("Needs different shard creation handling due to replicas")
+    public void testWriteIntoMultipleShards() throws Exception {
+        initEmptyDatastores("config");
+
+        leaderTestKit.waitForMembersUp("member-2");
+
+        LOG.warn("registering first shard");
+        final DistributedShardRegistration shardRegistration =
+                leaderShardFactory.createDistributedShard(TEST_ID,
+                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+        findLocalShard(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+        LOG.warn("Got after waiting for nonleader");
+        final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+
+        new JavaTestKit(leaderSystem) {
+            {
+                leaderShardManager.tell(
+                        new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+                final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
+
+                followerShardManager.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
+                followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+                LOG.warn("Found follower shard");
+
+                leaderDistributedDataStore.getActorContext().getShardManager().tell(
+                        new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
+            }
+        };
+
+        final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+
+        final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
+        final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
+        Assert.assertNotNull(cursor);
+        final YangInstanceIdentifier nameId =
+                YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
+        cursor.write(nameId.getLastPathArgument(),
+                ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
+                        new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
+
+        cursor.close();
+        LOG.warn("Got to pre submit");
+
+        tx.submit();
+    }
+
+    @Test
+    public void testMultipleShardRegistrations() throws Exception {
+        initEmptyDatastores("config");
+
+        final DistributedShardRegistration reg1 = leaderShardFactory
+                .createDistributedShard(TEST_ID,
+                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+        final DistributedShardRegistration reg2 = leaderShardFactory
+                .createDistributedShard(
+                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
+                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+        final DistributedShardRegistration reg3 = leaderShardFactory
+                .createDistributedShard(
+                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
+                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+        final DistributedShardRegistration reg4 = leaderShardFactory
+                .createDistributedShard(
+                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
+                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+        // check leader has local shards
+        assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+        assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
+
+        assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
+
+        assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
+
+        // check follower has local shards
+        assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+        assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
+
+        assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
+
+        assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
+
+
+        LOG.debug("Closing registrations");
+
+        reg1.close();
+        reg2.close();
+        reg3.close();
+        reg4.close();
+
+        waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+        waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+
+        waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+
+        waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+        LOG.debug("All leader shards gone");
+
+        waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+        waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+
+        waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+
+        waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+        LOG.debug("All follower shards gone");
+    }
+
+    @Test
+    public void testMultipleRegistrationsAtOnePrefix() throws Exception {
+        initEmptyDatastores("config");
+
+        for (int i = 0; i < 10; i++) {
+            LOG.debug("Round {}", i);
+            final DistributedShardRegistration reg1 = leaderShardFactory
+                    .createDistributedShard(TEST_ID,
+                            Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+            leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                    ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+            assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+                    ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+            assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+                    ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+            reg1.close();
+
+            waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+                    ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+            waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+                    ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+        }
+    }
+}
index 1e29bf6..88a12fe 100644 (file)
@@ -8,8 +8,9 @@
 
 package org.opendaylight.controller.cluster.sharding;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
+import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
+import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -18,14 +19,16 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.AbstractTest;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
@@ -35,75 +38,68 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Ignore("distributed-data is broken needs to be removed")
 public class DistributedShardedDOMDataTreeTest extends AbstractTest {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
+
     private static final Address MEMBER_1_ADDRESS =
             AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
 
     private static final DOMDataTreeIdentifier TEST_ID =
             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
 
-    private ShardedDOMDataTree shardedDOMDataTree = new ShardedDOMDataTree();
-
     private ActorSystem leaderSystem;
-    private ActorSystem followerSystem;
-
 
     private final Builder leaderDatastoreContextBuilder =
-            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+            DatastoreContext.newBuilder()
+                    .shardHeartbeatIntervalInMillis(100)
+                    .shardElectionTimeoutFactor(2)
+                    .logicalStoreType(
+                            org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
 
-    private final DatastoreContext.Builder followerDatastoreContextBuilder =
-            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
-                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
-
-    private DistributedDataStore followerDistributedDataStore;
     private DistributedDataStore leaderDistributedDataStore;
-    private IntegrationTestKit followerTestKit;
     private IntegrationTestKit leaderTestKit;
 
     private DistributedShardedDOMDataTree leaderShardFactory;
-    private DistributedShardedDOMDataTree followerShardFactory;
 
     @Before
     public void setUp() {
-        shardedDOMDataTree = new ShardedDOMDataTree();
 
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
 
-        followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
-        Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
     }
 
     @After
     public void tearDown() {
-        if (followerDistributedDataStore != null) {
-            leaderDistributedDataStore.close();
-        }
         if (leaderDistributedDataStore != null) {
             leaderDistributedDataStore.close();
         }
 
         JavaTestKit.shutdownActorSystem(leaderSystem);
-        JavaTestKit.shutdownActorSystem(followerSystem);
     }
 
     private void initEmptyDatastore(final String type) {
@@ -112,90 +108,44 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
         leaderDistributedDataStore =
                 leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
 
-        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
-        followerDistributedDataStore =
-                followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
-
         leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
-                Mockito.mock(DistributedDataStore.class),
+                leaderDistributedDataStore,
                 leaderDistributedDataStore);
-
-        followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
-                Mockito.mock(DistributedDataStore.class),
-                followerDistributedDataStore);
     }
 
     @Test
-    public void testProducerRegistrations() throws Exception {
+    public void testWritesIntoDefaultShard() throws Exception {
         initEmptyDatastore("config");
 
-        final DistributedShardRegistration shardRegistration =
-                leaderShardFactory.createDistributedShard(TEST_ID,
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        leaderShardFactory.createDistributedShard(TEST_ID,
+                Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
-                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+                ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
 
-        final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+        final DOMDataTreeIdentifier configRoot =
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
 
-        leaderShardManager.tell(
-                new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef());
-        leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalShardFound.class);
-
-        IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(),
-                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
-
-        leaderShardManager.tell(
-                new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef());
-        leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalPrimaryShardFound.class);
-
-        final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
-        followerShardManager.tell(
-                new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
-        followerTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), RemotePrimaryShardFound.class);
-
-        final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
-        try {
-            followerShardFactory.createProducer(Collections.singleton(TEST_ID));
-            fail("Producer should be already registered on the other node");
-        } catch (final IllegalArgumentException e) {
-            assertTrue(e.getMessage().contains("is attached to producer"));
-        }
-
-        producer.close();
-
-        final DOMDataTreeProducer followerProducer =
-                followerShardFactory.createProducer(Collections.singleton(TEST_ID));
-        try {
-            leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
-            fail("Producer should be already registered on the other node");
-        } catch (final IllegalArgumentException e) {
-            assertTrue(e.getMessage().contains("is attached to producer"));
-        }
+        final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(configRoot));
 
-        followerProducer.close();
-        // try to create a shard on an already registered prefix on follower
-        try {
-            followerShardFactory.createDistributedShard(TEST_ID,
-                    Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
-            fail("This prefix already should have a shard registration that was forwarded from the other node");
-        } catch (final DOMDataTreeShardingConflictException e) {
-            assertTrue(e.getMessage().contains("is already occupied by shard"));
-        }
+        final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
+        final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
+        Assert.assertNotNull(cursor);
     }
 
     @Test
-    @Ignore("Needs some other stuff related to 5280")
-    public void testWriteIntoMultipleShards() throws Exception {
+    public void testSingleNodeWrites() throws Exception {
         initEmptyDatastore("config");
 
-        final DistributedShardRegistration shardRegistration =
-                leaderShardFactory.createDistributedShard(
-                        TEST_ID,Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+        leaderShardFactory.createDistributedShard(TEST_ID,
+                Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
 
+        final DistributedShardRegistration shardRegistration =
+                leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME));
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
 
+        LOG.warn("Got after waiting for nonleader");
         final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
 
         new JavaTestKit(leaderSystem) {
@@ -204,12 +154,6 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
                         new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
                 expectMsgClass(duration("5 seconds"), LocalShardFound.class);
 
-                final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
-
-                followerShardManager.tell(
-                        new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
                 leaderDistributedDataStore.getActorContext().getShardManager().tell(
                         new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
                 expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
@@ -224,12 +168,136 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
         final YangInstanceIdentifier nameId =
                 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
         cursor.write(nameId.getLastPathArgument(),
-                ImmutableLeafNodeBuilder.<String>create()
-                        .withNodeIdentifier(new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
+                ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
+                        new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
 
         cursor.close();
-        tx.submit();
+        LOG.warn("Got to pre submit");
 
+        tx.submit().checkedGet();
+    }
+
+    @Test
+    public void testMultipleWritesIntoSingleMapEntry() throws Exception {
+        initEmptyDatastore("config");
+
+        final DistributedShardRegistration shardRegistration =
+                leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME));
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+        LOG.warn("Got after waiting for nonleader");
+        final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
 
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+        final YangInstanceIdentifier oid1 = TestModel.OUTER_LIST_PATH.node(new NodeIdentifierWithPredicates(
+                TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), 0));
+        final DOMDataTreeIdentifier outerListPath = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1);
+
+        final DistributedShardRegistration outerListShardReg = leaderShardFactory.createDistributedShard(outerListPath,
+                Lists.newArrayList(AbstractTest.MEMBER_NAME));
+
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(outerListPath.getRootIdentifier()));
+
+        final DOMDataTreeProducer shardProducer = leaderShardFactory.createProducer(
+                Collections.singletonList(outerListPath));
+
+        final DOMDataTreeCursorAwareTransaction tx = shardProducer.createTransaction(false);
+        final DOMDataTreeWriteCursor cursor =
+                tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1));
+        assertNotNull(cursor);
+
+        MapNode innerList = ImmutableMapNodeBuilder
+                .create()
+                .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME))
+                .build();
+
+        cursor.write(new NodeIdentifier(TestModel.INNER_LIST_QNAME), innerList);
+        cursor.close();
+        tx.submit().checkedGet();
+
+        final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
+        for (int i = 0; i < 1000; i++) {
+            final Collection<MapEntryNode> innerListMapEntries = createInnerListMapEntries(1000, "run-" + i);
+            for (final MapEntryNode innerListMapEntry : innerListMapEntries) {
+                final DOMDataTreeCursorAwareTransaction tx1 = shardProducer.createTransaction(false);
+                final DOMDataTreeWriteCursor cursor1 = tx1.createCursor(
+                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
+                                oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME))));
+                cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry);
+                cursor1.close();
+                futures.add(tx1.submit());
+            }
+        }
+
+        futures.get(futures.size() - 1).checkedGet();
+
+    }
+
+    private static Collection<MapEntryNode> createInnerListMapEntries(final int amount, final String valuePrefix) {
+        final Collection<MapEntryNode> ret = new ArrayList<>();
+        for (int i = 0; i < amount; i++) {
+            ret.add(ImmutableNodes.mapEntryBuilder()
+                    .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.INNER_LIST_QNAME,
+                            QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i)))
+                    .withChild(ImmutableNodes
+                            .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i))
+                    .build());
+        }
+
+        return ret;
+    }
+
+    @Test
+    public void testDistributedData() throws Exception {
+        initEmptyDatastore("config");
+
+        leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME));
+        leaderShardFactory.createDistributedShard(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
+                Lists.newArrayList(AbstractTest.MEMBER_NAME));
+        leaderShardFactory.createDistributedShard(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
+                Lists.newArrayList(AbstractTest.MEMBER_NAME));
+        leaderShardFactory.createDistributedShard(
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
+                Lists.newArrayList(AbstractTest.MEMBER_NAME));
+
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+    }
+
+    @Test
+    public void testMultipleRegistrationsAtOnePrefix() throws Exception {
+        initEmptyDatastore("config");
+
+        for (int i = 0; i < 10; i++) {
+            LOG.debug("Round {}", i);
+            final DistributedShardRegistration reg1 = leaderShardFactory
+                    .createDistributedShard(TEST_ID,
+                            Lists.newArrayList(AbstractTest.MEMBER_NAME));
+
+            leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                    ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+            assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+                    ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+            reg1.close();
+
+            waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+                    ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+        }
     }
-}
+}
\ No newline at end of file
index 48b7e9b..cd8a754 100644 (file)
@@ -44,6 +44,7 @@ public class TestModel {
     public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
     public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
     public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+    public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
     public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
     private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
 
index bc9a86b..a68a0a8 100644 (file)
@@ -35,6 +35,67 @@ bounded-mailbox {
   mailbox-push-timeout-time = 100ms
 }
 
+test {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
+
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
+
+  akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+
+    loglevel = "INFO"
+
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+
+      serializers {
+          java = "akka.serialization.JavaSerializer"
+          proto = "akka.remote.serialization.ProtobufSerializer"
+          readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
+      }
+
+      serialization-bindings {
+          "com.google.protobuf.Message" = proto
+          "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
+      }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      artery {
+        enabled = on
+        canonical.hostname = "127.0.0.1"
+        canonical.port = 2565
+      }
+
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2565
+      }
+    }
+
+    cluster {
+      auto-down-unreachable-after = 100s
+      retry-unsuccessful-join-after = 100ms
+
+      roles = [
+        "member-1"
+      ]
+    }
+  }
+}
+
 Member1 {
   bounded-mailbox {
     mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
index 4cd5b22..6cf580c 100644 (file)
@@ -8,3 +8,4 @@ org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker=debug
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker.actors.dds=debug
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.sharding=debug

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.