BUG-7965 Switch distributed-data backend to a separate shard 09/50609/46
authorTomas Cere <tcere@cisco.com>
Tue, 10 Jan 2017 11:44:57 +0000 (12:44 +0100)
committerJakub Morvay <jmorvay@cisco.com>
Fri, 31 Mar 2017 08:11:55 +0000 (10:11 +0200)
The shard needs to be present on all nodes and replicated across
the cluster. Making this into shard allows us to leverage the current
datastore api's and also persistence so we have the sharding layout
persisted.

The shard is started on all nodes once DistributedShardedDOMDataTree is
created.

Change-Id: I697be9b7134a27720e23e3e56f9fddc71301ec1e
Signed-off-by: Tomas Cere <tcere@cisco.com>
27 files changed:
opendaylight/md-sal/sal-common-util/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AbstractShardManagerCreator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/LookupTask.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigWriter.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/InitConfigListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/LookupPrefixShard.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java with 64% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemovalLookup.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java with 70% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/StartConfigShardLookup.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/prefix-shard-configuration.yang [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SchemaContextHelper.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/empty-modules.conf [new file with mode: 0644]

index 610366c..7065bd8 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-util</artifactId>
+    </dependency>
   </dependencies>
 
   <scm>
index dce5368..c0bb866 100644 (file)
@@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXB
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -94,7 +95,8 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
                 .datastoreContextFactory(datastoreContextFactory)
                 .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
                 .primaryShardInfoCache(primaryShardInfoCache)
-                .restoreFromSnapshot(restoreFromSnapshot);
+                .restoreFromSnapshot(restoreFromSnapshot)
+                .distributedDataStore(this);
 
         actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
                 shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
@@ -319,4 +321,19 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         return (ListenerRegistration<L>) listenerRegistrationProxy;
     }
 
+    @SuppressWarnings("unchecked")
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
+            final YangInstanceIdentifier internalPath,
+            final DOMDataTreeChangeListener delegate) {
+        Preconditions.checkNotNull(delegate, "delegate should not be null");
+
+        LOG.debug("Registering a listener for the configuration shard: {}", internalPath);
+
+        final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
+                new DataTreeChangeListenerProxy<>(actorContext, delegate, internalPath);
+        proxy.init(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
+
+        return (ListenerRegistration<L>) proxy;
+    }
+
 }
index 6b79d54..629418f 100644 (file)
@@ -8,19 +8,67 @@
 
 package org.opendaylight.controller.cluster.datastore.config;
 
-import akka.cluster.ddata.ReplicatedData;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 
 /**
  * Configuration for prefix based shards.
  */
-public class PrefixShardConfiguration implements ReplicatedData, Serializable {
+public class PrefixShardConfiguration implements Serializable {
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private PrefixShardConfiguration prefixShardConfiguration;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(final PrefixShardConfiguration prefixShardConfiguration) {
+            this.prefixShardConfiguration = prefixShardConfiguration;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput objectOutput) throws IOException {
+            objectOutput.writeObject(prefixShardConfiguration.getPrefix());
+            objectOutput.writeObject(prefixShardConfiguration.getShardStrategyName());
+
+            objectOutput.writeInt(prefixShardConfiguration.getShardMemberNames().size());
+            for (MemberName name : prefixShardConfiguration.getShardMemberNames()) {
+                name.writeTo(objectOutput);
+            }
+        }
+
+        @Override
+        public void readExternal(final ObjectInput objectInput) throws IOException, ClassNotFoundException {
+            final DOMDataTreeIdentifier prefix =  (DOMDataTreeIdentifier) objectInput.readObject();
+            final String strategyName = (String) objectInput.readObject();
+
+            final int size = objectInput.readInt();
+            final Collection<MemberName> shardMemberNames = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                shardMemberNames.add(MemberName.readFrom(objectInput));
+            }
+
+            prefixShardConfiguration = new PrefixShardConfiguration(prefix, strategyName, shardMemberNames);
+        }
+
+        private Object readResolve() {
+            return prefixShardConfiguration;
+        }
+    }
+
     private static final long serialVersionUID = 1L;
 
     private final DOMDataTreeIdentifier prefix;
@@ -57,23 +105,7 @@ public class PrefixShardConfiguration implements ReplicatedData, Serializable {
                 + '}';
     }
 
-    public String toDataMapKey() {
-        return "prefix=" + prefix;
-    }
-
-    @Override
-    public ReplicatedData merge(final ReplicatedData replicatedData) {
-        if (!(replicatedData instanceof PrefixShardConfiguration)) {
-            throw new IllegalStateException("replicatedData expected to be instance of PrefixShardConfiguration");
-        }
-        final PrefixShardConfiguration entry = (PrefixShardConfiguration) replicatedData;
-        if (!entry.getPrefix().equals(prefix)) {
-            // this should never happen since the key is the prefix
-            // if it does just return current?
-            return this;
-        }
-        final HashSet<MemberName> members = new HashSet<>(shardMemberNames);
-        members.addAll(entry.getShardMemberNames());
-        return new PrefixShardConfiguration(prefix, shardStrategyName, members);
+    private Object writeReplace() {
+        return new Proxy(this);
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java
deleted file mode 100644 (file)
index 2ac883b..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-/**
- * A message sent to the ShardManager to dynamically add a new local shard
- * that is a replica for an existing shard that is already available in the
- * cluster.
- */
-
-public class AddPrefixShardReplica {
-
-    private final YangInstanceIdentifier prefix;
-
-    public AddPrefixShardReplica(final YangInstanceIdentifier prefix) {
-        this.prefix = Preconditions.checkNotNull(prefix);
-    }
-
-    public YangInstanceIdentifier getPrefix() {
-        return prefix;
-    }
-
-    @Override
-    public String toString() {
-        return "AddPrefixShardReplica[ShardName=" + prefix + "]";
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java
deleted file mode 100644 (file)
index 9a87a50..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import org.opendaylight.controller.cluster.datastore.DatastoreContext;
-import org.opendaylight.controller.cluster.datastore.Shard;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-
-/**
- * Message sent to the ShardManager to create a shard located at a certain logical position in the dataTree.
- */
-public class CreatePrefixedShard {
-
-    private final PrefixShardConfiguration config;
-    private final DatastoreContext context;
-    private final Shard.Builder builder;
-
-    public CreatePrefixedShard(final PrefixShardConfiguration config,
-                               final DatastoreContext context,
-                               final Shard.Builder builder) {
-        this.config = config;
-        this.context = context;
-        this.builder = builder;
-    }
-
-    public PrefixShardConfiguration getConfig() {
-        return config;
-    }
-
-    public DatastoreContext getContext() {
-        return context;
-    }
-
-    public Shard.Builder getShardBuilder() {
-        return builder;
-    }
-}
index 7dbd669..4dd409e 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.shardmanager;
 import akka.actor.Props;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.CountDownLatch;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
@@ -20,6 +21,7 @@ public abstract class AbstractShardManagerCreator<T extends AbstractShardManager
     private ClusterWrapper cluster;
     private Configuration configuration;
     private DatastoreContextFactory datastoreContextFactory;
+    private AbstractDataStore distributedDataStore;
     private CountDownLatch waitTillReadyCountDownLatch;
     private PrimaryShardInfoFutureCache primaryShardInfoCache;
     private DatastoreSnapshot restoreFromSnapshot;
@@ -62,9 +64,19 @@ public abstract class AbstractShardManagerCreator<T extends AbstractShardManager
         return datastoreContextFactory;
     }
 
-    public T datastoreContextFactory(DatastoreContextFactory newDatastoreContextFactory) {
+    public T datastoreContextFactory(final DatastoreContextFactory newDatastoreContextFactory) {
         checkSealed();
-        this.datastoreContextFactory = newDatastoreContextFactory;
+        this.datastoreContextFactory = Preconditions.checkNotNull(newDatastoreContextFactory);
+        return self();
+    }
+
+    AbstractDataStore getDistributedDataStore() {
+        return distributedDataStore;
+    }
+
+    public T distributedDataStore(final AbstractDataStore distributedDataStore) {
+        checkSealed();
+        this.distributedDataStore = distributedDataStore;
         return self();
     }
 
@@ -103,6 +115,7 @@ public abstract class AbstractShardManagerCreator<T extends AbstractShardManager
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+        Preconditions.checkNotNull(distributedDataStore, "distributedDataStore should not be null");
         Preconditions.checkNotNull(waitTillReadyCountDownLatch, "waitTillReadyCountdownLatch should not be null");
         Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
     }
index 0596872..5bfa185 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
-import static akka.actor.ActorRef.noSender;
 import static akka.pattern.Patterns.ask;
 
 import akka.actor.ActorRef;
@@ -22,10 +21,6 @@ import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
 import akka.cluster.ClusterEvent.MemberWeaklyUp;
 import akka.cluster.Member;
-import akka.cluster.ddata.DistributedData;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.Replicator.Changed;
-import akka.cluster.ddata.Replicator.Subscribe;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
@@ -38,8 +33,6 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -54,9 +47,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
@@ -71,10 +64,8 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
-import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
@@ -109,8 +100,14 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -165,8 +162,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
     private final String persistenceId;
+    private final AbstractDataStore dataStore;
 
-    private final ActorRef replicator;
+    private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
+    private PrefixedShardConfigUpdateHandler configUpdateHandler;
 
     ShardManager(AbstractShardManagerCreator<?> builder) {
         this.cluster = builder.getCluster();
@@ -192,16 +191,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
         shardManagerMBean.registerMBean();
 
-        replicator = DistributedData.get(context().system()).replicator();
-
+        dataStore = builder.getDistributedDataStore();
     }
 
+    @Override
     public void preStart() {
-        LOG.info("Starting Shardmanager {}", persistenceId);
-
-        final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
-                new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
-        replicator.tell(subscribe, noSender());
+        LOG.info("Starting ShardManager {}", persistenceId);
     }
 
     @Override
@@ -209,6 +204,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.info("Stopping ShardManager {}", persistenceId());
 
         shardManagerMBean.unregisterMBean();
+
+        if (configListenerReg != null) {
+            configListenerReg.close();
+            configListenerReg = null;
+        }
     }
 
     @Override
@@ -249,10 +249,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onCreateShard((CreateShard)message);
         } else if (message instanceof AddShardReplica) {
             onAddShardReplica((AddShardReplica) message);
-        } else if (message instanceof CreatePrefixedShard) {
-            onCreatePrefixedShard((CreatePrefixedShard) message);
-        } else if (message instanceof AddPrefixShardReplica) {
-            onAddPrefixShardReplica((AddPrefixShardReplica) message);
+        } else if (message instanceof PrefixShardCreated) {
+            onPrefixShardCreated((PrefixShardCreated) message);
+        } else if (message instanceof PrefixShardRemoved) {
+            onPrefixShardRemoved((PrefixShardRemoved) message);
+        } else if (message instanceof InitConfigListener) {
+            onInitConfigListener();
         } else if (message instanceof ForwardedAddServerReply) {
             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
@@ -283,13 +285,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onGetLocalShardIds();
         } else if (message instanceof RunnableMessage) {
             ((RunnableMessage)message).run();
-        } else if (message instanceof Changed) {
-            onConfigChanged((Changed) message);
         } else {
             unknownMessage(message);
         }
     }
 
+    private void onInitConfigListener() {
+        LOG.debug("{}: Initializing config listener.", persistenceId());
+
+        final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+                org.opendaylight.mdsal.common.api.LogicalDatastoreType
+                        .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
+
+        if (configUpdateHandler != null) {
+            configUpdateHandler.close();
+        }
+
+        configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
+        configUpdateHandler.initListener(dataStore, type);
+    }
+
     private void onShutDown() {
         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
         for (ShardInformation info : localShards.values()) {
@@ -340,88 +355,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
-        LOG.debug("{}, ShardManager {} received config changed {}",
-                cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries());
-
-        final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
-
-        final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
-                changedConfig.values().stream().collect(
-                        Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity()));
-
-        resolveConfig(newConfig);
-    }
-
-    private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
-        LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}",
-                cluster.getCurrentMemberName(), persistenceId, newConfig);
-
-        newConfig.forEach((prefix, config) ->
-                LOG.debug("{} ShardManager : {}, received shard config "
-                        + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config));
-
-        final SetView<DOMDataTreeIdentifier> removedConfigs =
-                Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet());
-
-        // resolve removals
-
-        resolveRemovals(removedConfigs);
-
-        final SetView<DOMDataTreeIdentifier> addedConfigs =
-                Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet());
-        // resolve additions
-
-        resolveAdditions(addedConfigs, newConfig);
-        // iter through to update existing shards, either start/stop replicas or update the shard
-        // to check for more peers
-        resolveUpdates(Collections.emptySet());
-    }
-
-    private void resolveRemovals(final Set<DOMDataTreeIdentifier> removedConfigs) {
-        LOG.debug("{} ShardManager : {}, resolving removed configs : {}",
-                cluster.getCurrentMemberName(), persistenceId, removedConfigs);
-
-        removedConfigs.forEach(id -> doRemovePrefixedShard(id));
-    }
-
-    private void resolveAdditions(final Set<DOMDataTreeIdentifier> addedConfigs,
-                                  final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
-        LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
-
-        addedConfigs.stream().filter(identifier
-            -> identifier
-            .getDatastoreType().equals(
-                    ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType())))
-            .forEach(id -> doCreatePrefixedShard(configs.get(id)));
-    }
-
-    private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
-        LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs);
-    }
-
-    private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) {
-        LOG.debug("{} ShardManager : {}, removing prefix shard: {}",
-                cluster.getCurrentMemberName(), persistenceId, prefix);
-        final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
-        final ShardInformation shard = localShards.remove(shardId.getShardName());
-
-        configuration.removePrefixShardConfiguration(prefix);
-
-        if (shard == null) {
-            LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix);
-            return;
-        }
-
-        if (shard.getActor() != null) {
-            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
-            shard.getActor().tell(Shutdown.INSTANCE, self());
-        }
-        LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(),
-                persistenceId(), shardId.getShardName());
-        persistShardList();
-    }
-
     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
             String leaderPath) {
         shardReplicaOperationsInProgress.remove(shardId.getShardName());
@@ -524,31 +457,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
-        LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
-
-        Object reply;
-        try {
-            final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
-                    createPrefixedShard.getConfig().getPrefix());
-            if (localShards.containsKey(shardId.getShardName())) {
-                LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
-                reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
-            } else {
-                doCreatePrefixedShard(createPrefixedShard);
-                reply = new Status.Success(null);
-            }
-        } catch (final Exception e) {
-            LOG.error("{}: onCreateShard failed", persistenceId(), e);
-            reply = new Status.Failure(e);
-        }
-
-        if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
-            getSender().tell(reply, getSelf());
-        }
-    }
-
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void onCreateShard(CreateShard createShard) {
         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
@@ -573,13 +481,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
-        doCreatePrefixedShard(createPrefixedShard.getConfig());
-        // do not replicate on this level
-    }
+    private void onPrefixShardCreated(final PrefixShardCreated message) {
+        LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
 
-    private void doCreatePrefixedShard(final PrefixShardConfiguration config) {
-        LOG.debug("doCreatePrefixShard : {}", config.getPrefix());
+        final PrefixShardConfiguration config = message.getConfiguration();
 
         final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
                 config.getPrefix());
@@ -596,6 +501,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
         }
 
+        doCreatePrefixShard(config, shardId, shardName);
+    }
+
+    private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
         configuration.addPrefixShardConfiguration(config);
 
         final Builder builder = newShardDatastoreContextBuilder(shardName);
@@ -605,10 +514,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         final Map<String, String> peerAddresses = Collections.emptyMap();
         final boolean isActiveMember = true;
-        LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: "
-                        + "{}, peerAddresses: {}, isActiveMember: {}",
-                shardId, persistenceId(), config.getShardMemberNames(),
-                peerAddresses, isActiveMember);
+
+        LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+                persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
 
         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
                 shardDatastoreContext, Shard.builder(), peerAddressResolver);
@@ -618,7 +526,28 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if (schemaContext != null) {
             info.setActor(newShardActor(schemaContext, info));
         }
+    }
+
+    private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+        LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
+
+        final DOMDataTreeIdentifier prefix = message.getPrefix();
+        final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
+        final ShardInformation shard = localShards.remove(shardId.getShardName());
+
+        configuration.removePrefixShardConfiguration(prefix);
+
+        if (shard == null) {
+            LOG.warn("{}: Received removal for unconfigured shard: {}, ignoring.. ", persistenceId(), prefix);
+            return;
+        }
 
+        if (shard.getActor() != null) {
+            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
+            shard.getActor().tell(Shutdown.INSTANCE, self());
+        }
+
+        LOG.debug("{}: Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
         persistShardList();
     }
 
@@ -1259,38 +1188,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return false;
     }
 
-
-    // With this message the shard does NOT have to be preconfigured
-    // do a dynamic lookup if the shard exists somewhere and replicate
-    private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
-        final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
-
-        LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
-
-        if (schemaContext == null) {
-            final String msg = String.format(
-                    "No SchemaContext is available in order to create a local shard instance for %s", shardName);
-            LOG.debug("{}: {}", persistenceId(), msg);
-            getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
-            return;
-        }
-
-        findPrimary(shardName,
-                new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
-                    @Override
-                    public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
-                        getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
-                                getTargetActor());
-                    }
-
-                    @Override
-                    public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
-                        sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
-                    }
-                }
-        );
-    }
-
     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
@@ -1584,9 +1481,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void findLocalShard(FindLocalShard message) {
+        LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
+
         final ShardInformation shardInformation = localShards.get(message.getShardName());
 
         if (shardInformation == null) {
+            LOG.debug("{}: Local shard {} not found - shards present: {}",
+                    persistenceId(), message.getShardName(), localShards.keySet());
+
             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
             return;
         }
index 607e78c..2fb9cf1 100644 (file)
@@ -8,30 +8,41 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
-import akka.cluster.ddata.Key;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.ORMapKey;
 import java.util.Map;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utils for encoding prefix shard name.
  */
 public class ClusterUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterUtils.class);
 
-    // key for replicated configuration key
-    public static final Key<ORMap<PrefixShardConfiguration>> CONFIGURATION_KEY =
-            ORMapKey.create("prefix-shard-configuration-config");
+    // id for the shard used to store prefix configuration
+    public static final String PREFIX_CONFIG_SHARD_ID = "prefix-configuration-shard";
 
-    public static final Key<ORMap<PrefixShardConfiguration>> OPERATIONAL_KEY =
-            ORMapKey.create("prefix-shard-configuration-oper");
+    public static final QName PREFIX_SHARDS_QNAME =
+            QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:clustering:prefix-shard-configuration",
+                    "2017-01-10", "prefix-shards").intern();
+    public static final QName SHARD_LIST_QNAME =
+            QName.create(PREFIX_SHARDS_QNAME, "shard").intern();
+    public static final QName SHARD_PREFIX_QNAME =
+            QName.create(PREFIX_SHARDS_QNAME, "prefix").intern();
+    public static final QName SHARD_REPLICAS_QNAME =
+            QName.create(PREFIX_SHARDS_QNAME, "replicas").intern();
+    public static final QName SHARD_REPLICA_QNAME =
+            QName.create(PREFIX_SHARDS_QNAME, "replica").intern();
+
+    public static final YangInstanceIdentifier PREFIX_SHARDS_PATH =
+            YangInstanceIdentifier.of(PREFIX_SHARDS_QNAME).toOptimized();
+    public static final YangInstanceIdentifier SHARD_LIST_PATH =
+            PREFIX_SHARDS_PATH.node(SHARD_LIST_QNAME).toOptimized();
 
     public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) {
         final String type;
@@ -44,6 +55,7 @@ public class ClusterUtils {
                 break;
             default:
                 type = prefix.getDatastoreType().name();
+                LOG.warn("Unknown data store type {}", type);
         }
 
         return ShardIdentifier.create(getCleanShardName(prefix.getRootIdentifier()), memberName, type);
@@ -80,17 +92,4 @@ public class ClusterUtils {
         });
         return builder.toString();
     }
-
-    public static Key<ORMap<PrefixShardConfiguration>> getReplicatorKey(LogicalDatastoreType type) {
-        if (LogicalDatastoreType.CONFIGURATION.equals(type)) {
-            return CONFIGURATION_KEY;
-        } else {
-            return OPERATIONAL_KEY;
-        }
-    }
-
-    public static org.opendaylight.mdsal.common.api.LogicalDatastoreType toMDSalApi(
-            final LogicalDatastoreType logicalDatastoreType) {
-        return org.opendaylight.mdsal.common.api.LogicalDatastoreType.valueOf(logicalDatastoreType.name());
-    }
 }
index 53411a9..f6060a8 100644 (file)
@@ -61,7 +61,7 @@ class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard {
     @Override
     public synchronized DOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> paths) {
         for (final DOMDataTreeIdentifier prodPrefix : paths) {
-            Preconditions.checkArgument(paths.contains(prodPrefix), "Prefix %s is not contained under shard root",
+            Preconditions.checkArgument(shardRoot.contains(prodPrefix), "Prefix %s is not contained under shard root",
                     prodPrefix, paths);
         }
 
index 69dfe4c..30b46e5 100644 (file)
@@ -14,40 +14,52 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.cluster.Cluster;
-import akka.cluster.Member;
 import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Collections2;
 import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
-import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
@@ -63,13 +75,15 @@ import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
 import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.prefix.shard.configuration.rev170110.PrefixShards;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
 import scala.compat.java8.FutureConverters;
+import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -84,9 +98,9 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     private static final int MAX_ACTOR_CREATION_RETRIES = 100;
     private static final int ACTOR_RETRY_DELAY = 100;
     private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
-    static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = new FiniteDuration(
-                    ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * 3,
-                    TimeUnit.SECONDS);
+    private static final int LOOKUP_TASK_MAX_RETRIES = 100;
+    static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION =
+            new FiniteDuration(LOOKUP_TASK_MAX_RETRIES * LOOKUP_TASK_MAX_RETRIES * 3, TimeUnit.SECONDS);
     static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION);
 
     static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
@@ -99,12 +113,21 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     private final ActorRef shardedDataTreeActor;
     private final MemberName memberName;
 
+    @GuardedBy("shards")
     private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
             DOMDataTreePrefixTable.create();
 
     private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
             new EnumMap<>(LogicalDatastoreType.class);
 
+    private final EnumMap<LogicalDatastoreType, Entry<DataStoreClient, ActorRef>> configurationShardMap =
+            new EnumMap<>(LogicalDatastoreType.class);
+
+    private final EnumMap<LogicalDatastoreType, PrefixedShardConfigWriter> writerMap =
+            new EnumMap<>(LogicalDatastoreType.class);
+
+    private final PrefixedShardConfigUpdateHandler updateHandler;
+
     public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
                                          final DistributedDataStore distributedOperDatastore,
                                          final DistributedDataStore distributedConfigDatastore) {
@@ -119,27 +142,141 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
                         .setActorSystem(actorSystem)
                         .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
                         .setDistributedConfigDatastore(distributedConfigDatastore)
-                        .setDistributedOperDatastore(distributedOperDatastore),
+                        .setDistributedOperDatastore(distributedOperDatastore)
+                        .setLookupTaskMaxRetries(LOOKUP_TASK_MAX_RETRIES),
                 ACTOR_ID);
 
         this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
 
+        updateHandler = new PrefixedShardConfigUpdateHandler(shardedDataTreeActor,
+                distributedConfigDatastore.getActorContext().getCurrentMemberName());
+
+        LOG.debug("{} - Starting prefix configuration shards", memberName);
+        createPrefixConfigShard(distributedConfigDatastore);
+        createPrefixConfigShard(distributedOperDatastore);
+    }
+
+    private void createPrefixConfigShard(final DistributedDataStore dataStore) {
+        Configuration configuration = dataStore.getActorContext().getConfiguration();
+        Collection<MemberName> memberNames = configuration.getUniqueMemberNamesForAllShards();
+        CreateShard createShardMessage =
+                new CreateShard(new ModuleShardConfiguration(PrefixShards.QNAME.getNamespace(),
+                        "prefix-shard-configuration", ClusterUtils.PREFIX_CONFIG_SHARD_ID, ModuleShardStrategy.NAME,
+                        memberNames),
+                        Shard.builder(), dataStore.getActorContext().getDatastoreContext());
+
+        dataStore.getActorContext().getShardManager().tell(createShardMessage, noSender());
+    }
+
+    /**
+     * This will try to initialize prefix configuration shards upon their
+     * successful start. We need to create writers to these shards, so we can
+     * satisfy future {@link #createDistributedShard} and
+     * {@link #resolveShardAdditions} requests and update prefix configuration
+     * shards accordingly.
+     *
+     * <p>
+     * We also need to initialize listeners on these shards, so we can react
+     * on changes made on them by other cluster members or even by ourselves.
+     *
+     * <p>
+     * Finally, we need to be sure that default shards for both operational and
+     * configuration data stores are up and running and we have distributed
+     * shards frontend created for them.
+     */
+    void init() {
+        // create our writers to the configuration
+        try {
+            LOG.debug("{} - starting config shard lookup.",
+                    distributedConfigDatastore.getActorContext().getCurrentMemberName());
+
+            // We have to wait for prefix config shards to be up and running
+            // so we can create datastore clients for them
+            handleConfigShardLookup().get(SHARD_FUTURE_TIMEOUT_DURATION.length(), SHARD_FUTURE_TIMEOUT_DURATION.unit());
+
+            LOG.debug("Prefix configuration shards ready - creating clients");
+
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new IllegalStateException("Prefix config shards not found", e);
+        }
+
+        try {
+            LOG.debug("Prefix configuration shards ready - creating clients");
+            configurationShardMap.put(LogicalDatastoreType.CONFIGURATION,
+                    createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
+                            distributedConfigDatastore.getActorContext()));
+        } catch (final DOMDataTreeShardCreationFailedException e) {
+            throw new IllegalStateException(
+                    "Unable to create datastoreClient for config DS prefix configuration shard.", e);
+        }
+
+        try {
+            configurationShardMap.put(LogicalDatastoreType.OPERATIONAL,
+                    createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
+                            distributedOperDatastore.getActorContext()));
+
+        } catch (final DOMDataTreeShardCreationFailedException e) {
+            throw new IllegalStateException(
+                        "Unable to create datastoreClient for oper DS prefix configuration shard.", e);
+        }
+
+        writerMap.put(LogicalDatastoreType.CONFIGURATION, new PrefixedShardConfigWriter(
+                configurationShardMap.get(LogicalDatastoreType.CONFIGURATION).getKey()));
+
+        writerMap.put(LogicalDatastoreType.OPERATIONAL, new PrefixedShardConfigWriter(
+                configurationShardMap.get(LogicalDatastoreType.OPERATIONAL).getKey()));
+
+        updateHandler.initListener(distributedConfigDatastore, LogicalDatastoreType.CONFIGURATION);
+        updateHandler.initListener(distributedOperDatastore, LogicalDatastoreType.OPERATIONAL);
+
+        distributedConfigDatastore.getActorContext().getShardManager().tell(InitConfigListener.INSTANCE, noSender());
+        distributedOperDatastore.getActorContext().getShardManager().tell(InitConfigListener.INSTANCE, noSender());
+
+
         //create shard registration for DEFAULT_SHARD
         try {
             defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
                     initDefaultShard(LogicalDatastoreType.CONFIGURATION));
         } catch (final InterruptedException | ExecutionException e) {
-            LOG.error("Unable to create default shard frontend for config shard", e);
+            throw new IllegalStateException("Unable to create default shard frontend for config shard", e);
         }
 
         try {
             defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
                     initDefaultShard(LogicalDatastoreType.OPERATIONAL));
         } catch (final InterruptedException | ExecutionException e) {
-            LOG.error("Unable to create default shard frontend for operational shard", e);
+            throw new IllegalStateException("Unable to create default shard frontend for operational shard", e);
         }
     }
 
+    private ListenableFuture<List<Void>> handleConfigShardLookup() {
+
+        final ListenableFuture<Void> configFuture = lookupConfigShard(LogicalDatastoreType.CONFIGURATION);
+        final ListenableFuture<Void> operFuture = lookupConfigShard(LogicalDatastoreType.OPERATIONAL);
+
+        return Futures.allAsList(configFuture, operFuture);
+    }
+
+    private ListenableFuture<Void> lookupConfigShard(final LogicalDatastoreType type) {
+        final SettableFuture<Void> future = SettableFuture.create();
+
+        final Future<Object> ask =
+                Patterns.ask(shardedDataTreeActor, new StartConfigShardLookup(type), SHARD_FUTURE_TIMEOUT);
+
+        ask.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable throwable, final Object result) throws Throwable {
+                if (throwable != null) {
+                    future.setException(throwable);
+                } else {
+                    future.set(null);
+                }
+            }
+        }, actorSystem.dispatcher());
+
+        return future;
+    }
+
     @Nonnull
     @Override
     public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
@@ -176,34 +313,54 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     public CompletionStage<DistributedShardRegistration> createDistributedShard(
             final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
             throws DOMDataTreeShardingConflictException {
-        final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
-                shards.lookup(prefix);
-        if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
-            throw new DOMDataTreeShardingConflictException(
-                    "Prefix " + prefix + " is already occupied by another shard.");
+
+        synchronized (shards) {
+            final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
+                    shards.lookup(prefix);
+            if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
+                throw new DOMDataTreeShardingConflictException(
+                        "Prefix " + prefix + " is already occupied by another shard.");
+            }
         }
 
-        final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
+        final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType());
+
+        final ListenableFuture<Void> writeFuture =
+                writer.writeConfig(prefix.getRootIdentifier(), replicaMembers);
+
+        final Promise<DistributedShardRegistration> shardRegistrationPromise = akka.dispatch.Futures.promise();
+        Futures.addCallback(writeFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(@Nullable final Void result) {
+
+                final Future<Object> ask =
+                        Patterns.ask(shardedDataTreeActor, new LookupPrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
+
+                shardRegistrationPromise.completeWith(ask.transform(
+                        new Mapper<Object, DistributedShardRegistration>() {
+                            @Override
+                            public DistributedShardRegistration apply(final Object parameter) {
+                                return new DistributedShardRegistrationImpl(
+                                        prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
+                            }
+                        },
+                        new Mapper<Throwable, Throwable>() {
+                            @Override
+                            public Throwable apply(final Throwable throwable) {
+                                return new DOMDataTreeShardCreationFailedException(
+                                        "Unable to create a cds shard.", throwable);
+                            }
+                        }, actorSystem.dispatcher()));
+            }
 
-        final Future<Object> ask =
-                Patterns.ask(shardedDataTreeActor, new CreatePrefixShard(config), SHARD_FUTURE_TIMEOUT);
-
-        final Future<DistributedShardRegistration> shardRegistrationFuture = ask.transform(
-                new Mapper<Object, DistributedShardRegistration>() {
-                    @Override
-                    public DistributedShardRegistration apply(final Object parameter) {
-                        return new DistributedShardRegistrationImpl(
-                                prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
-                    }
-                },
-                new Mapper<Throwable, Throwable>() {
-                    @Override
-                    public Throwable apply(final Throwable throwable) {
-                        return new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable);
-                    }
-                }, actorSystem.dispatcher());
-
-        return FutureConverters.toJava(shardRegistrationFuture);
+            @Override
+            public void onFailure(final Throwable throwable) {
+                shardRegistrationPromise.failure(
+                        new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable));
+            }
+        });
+
+        return FutureConverters.toJava(shardRegistrationPromise.future());
     }
 
     void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
@@ -247,9 +404,14 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
             @SuppressWarnings("unchecked")
             final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
                     (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
-            shards.store(prefix, reg);
+
+            synchronized (shards) {
+                shards.store(prefix, reg);
+            }
+
         } catch (final DOMDataTreeShardingConflictException e) {
-            LOG.error("Prefix {} is already occupied by another shard", prefix, e);
+            LOG.error("{}: Prefix {} is already occupied by another shard",
+                    distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), prefix, e);
         } catch (DOMDataTreeProducerException e) {
             LOG.error("Unable to close producer", e);
         } catch (DOMDataTreeShardCreationFailedException e) {
@@ -259,8 +421,10 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
 
     private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
         LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
-        final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
-                shards.lookup(prefix);
+        final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup;
+        synchronized (shards) {
+            lookup = shards.lookup(prefix);
+        }
 
         if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
             LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
@@ -270,7 +434,24 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
 
         lookup.getValue().close();
         // need to remove from our local table thats used for tracking
-        shards.remove(prefix);
+        synchronized (shards) {
+            shards.remove(prefix);
+        }
+
+        final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType());
+        final ListenableFuture<Void> future = writer.removeConfig(prefix.getRootIdentifier());
+
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(@Nullable Void result) {
+                LOG.debug("{} - Succesfuly removed shard for {}", memberName, prefix);
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {
+                LOG.error("Removal of shard {} from configuration failed.", prefix, throwable);
+            }
+        });
     }
 
     DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
@@ -321,23 +502,35 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         }
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
             throws ExecutionException, InterruptedException {
-        final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
-                Cluster.get(actorSystem).state().members()).asJavaCollection();
-        final Collection<MemberName> names = Collections2.transform(members,
-            m -> MemberName.forName(m.roles().iterator().next()));
+        final Collection<MemberName> names =
+                distributedConfigDatastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards();
 
-        try {
-            // we should probably only have one node create the default shards
-            return createDistributedShard(
-                    new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)
-                    .toCompletableFuture().get();
-        } catch (DOMDataTreeShardingConflictException e) {
-            LOG.debug("Default shard already registered, possibly due to other node doing it faster");
+        final PrefixedShardConfigWriter writer = writerMap.get(logicalDatastoreType);
+
+        if (writer.checkDefaultIsPresent()) {
+            LOG.debug("Default shard for {} is already present in the config. Possibly saved in snapshot.",
+                    logicalDatastoreType);
             return new DistributedShardRegistrationImpl(
                     new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
                     shardedDataTreeActor, this);
+        } else {
+            try {
+                // we should probably only have one node create the default shards
+                return Await.result(FutureConverters.toScala(createDistributedShard(
+                        new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)),
+                        SHARD_FUTURE_TIMEOUT_DURATION);
+            } catch (DOMDataTreeShardingConflictException e) {
+                LOG.debug("Default shard already registered, possibly due to other node doing it faster");
+                return new DistributedShardRegistrationImpl(
+                        new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
+                        shardedDataTreeActor, this);
+            } catch (Exception e) {
+                LOG.error("{} default shard initialization failed", logicalDatastoreType, e);
+                throw new RuntimeException(e);
+            }
         }
     }
 
@@ -390,7 +583,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
             distributedShardedDOMDataTree.despawnShardFrontend(prefix);
             // update the config so the remote nodes are updated
             final Future<Object> ask =
-                    Patterns.ask(shardedDataTreeActor, new RemovePrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
+                    Patterns.ask(shardedDataTreeActor, new PrefixShardRemovalLookup(prefix), SHARD_FUTURE_TIMEOUT);
 
             final Future<Void> closeFuture = ask.transform(
                     new Mapper<Object, Void>() {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/LookupTask.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/LookupTask.java
new file mode 100644 (file)
index 0000000..ca33e31
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Base class for lookup tasks. Lookup tasks are supposed to run repeatedly
+ * until successful lookup or maximum retries are hit.
+ */
+@NotThreadSafe
+abstract class LookupTask implements Runnable {
+    private final int maxRetries;
+    private final ActorRef replyTo;
+    private int retries = 0;
+
+    LookupTask(final ActorRef replyTo, final int maxRetries) {
+        this.replyTo = replyTo;
+        this.maxRetries = maxRetries;
+    }
+
+    abstract void reschedule(int retries);
+
+    void tryReschedule(@Nullable final Throwable throwable) {
+        if (retries <= maxRetries) {
+            retries++;
+            reschedule(retries);
+        } else {
+            fail(throwable);
+        }
+    }
+
+    void fail(@Nullable final Throwable throwable) {
+        if (throwable == null) {
+            replyTo.tell(new Status.Failure(
+                    new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
+                            + "Failing..")), noSender());
+        } else {
+            replyTo.tell(new Status.Failure(
+                    new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
+                            + "Failing..", throwable)), noSender());
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java
new file mode 100644 (file)
index 0000000..f8ecdb8
--- /dev/null
@@ -0,0 +1,187 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_PREFIX_QNAME;
+import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_REPLICAS_QNAME;
+import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_REPLICA_QNAME;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens on changes on prefix-shard-configuration. Resolves the changes and
+ * notifies handling actor with {@link PrefixShardCreated} and
+ * {@link PrefixShardRemoved} messages.
+ */
+public class PrefixedShardConfigUpdateHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrefixedShardConfigUpdateHandler.class);
+    private final ActorRef handlingActor;
+    private final MemberName memberName;
+
+    private final EnumMap<LogicalDatastoreType,
+            ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener>> registrations =
+            new EnumMap<>(LogicalDatastoreType.class);
+
+    public PrefixedShardConfigUpdateHandler(final ActorRef handlingActor, final MemberName memberName) {
+        this.handlingActor = Preconditions.checkNotNull(handlingActor);
+        this.memberName = Preconditions.checkNotNull(memberName);
+    }
+
+    public void initListener(final AbstractDataStore dataStore, final LogicalDatastoreType type) {
+        registrations.put(type, dataStore.registerShardConfigListener(
+                ClusterUtils.SHARD_LIST_PATH, new ShardConfigHandler(memberName, type, handlingActor)));
+    }
+
+    public void close() {
+        registrations.values().forEach(ListenerRegistration::close);
+        registrations.clear();
+    }
+
+    public static final class ShardConfigHandler implements ClusteredDOMDataTreeChangeListener {
+
+        private final MemberName memberName;
+        private final LogicalDatastoreType type;
+        private final ActorRef handlingActor;
+
+        public ShardConfigHandler(final MemberName memberName,
+                           final LogicalDatastoreType type,
+                           final ActorRef handlingActor) {
+            this.memberName = memberName;
+            this.type = type;
+            this.handlingActor = handlingActor;
+        }
+
+        @Override
+        public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+            changes.forEach(this::resolveChange);
+        }
+
+        private void resolveChange(final DataTreeCandidate candidate) {
+            switch (candidate.getRootNode().getModificationType()) {
+                case UNMODIFIED:
+                    break;
+                case SUBTREE_MODIFIED:
+                case APPEARED:
+                case WRITE:
+                    resolveWrite(candidate.getRootNode());
+                    break;
+                case DELETE:
+                case DISAPPEARED:
+                    resolveDelete(candidate.getRootNode());
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private void resolveWrite(final DataTreeCandidateNode rootNode) {
+
+            LOG.debug("{}: New config received {}", memberName, rootNode);
+            LOG.debug("{}: Data after: {}", memberName, rootNode.getDataAfter());
+
+            // were in the shards list, iter children and resolve
+            for (final DataTreeCandidateNode childNode : rootNode.getChildNodes()) {
+                switch (childNode.getModificationType()) {
+                    case UNMODIFIED:
+                        break;
+                    case SUBTREE_MODIFIED:
+                    case APPEARED:
+                    case WRITE:
+                        resolveWrittenShard(childNode);
+                        break;
+                    case DELETE:
+                    case DISAPPEARED:
+                        resolveDeletedShard(childNode);
+                        break;
+                    default:
+                        break;
+                }
+            }
+        }
+
+        private void resolveWrittenShard(final DataTreeCandidateNode childNode) {
+            final MapEntryNode entryNode = (MapEntryNode) childNode.getDataAfter().get();
+            final LeafNode<YangInstanceIdentifier> prefix =
+                    (LeafNode<YangInstanceIdentifier>) entryNode.getChild(new NodeIdentifier(SHARD_PREFIX_QNAME)).get();
+
+            final YangInstanceIdentifier identifier = prefix.getValue();
+
+            LOG.debug("{}: Deserialized {} from datastore", memberName, identifier);
+
+            final ContainerNode replicas =
+                    (ContainerNode) entryNode.getChild(new NodeIdentifier(SHARD_REPLICAS_QNAME)).get();
+
+            final LeafSetNode<String> replicaList =
+                    (LeafSetNode<String>) replicas.getChild(new NodeIdentifier(SHARD_REPLICA_QNAME)).get();
+
+            final List<MemberName> retReplicas = replicaList.getValue().stream()
+                    .map(child -> MemberName.forName(child.getValue()))
+                    .collect(Collectors.toList());
+
+            LOG.debug("{}: Replicas read from ds {}", memberName, retReplicas.toString());
+
+            final PrefixShardConfiguration newConfig =
+                    new PrefixShardConfiguration(new DOMDataTreeIdentifier(type, identifier),
+                            PrefixShardStrategy.NAME, retReplicas);
+
+            LOG.debug("{}: Resulting config {}", memberName, newConfig);
+
+            handlingActor.tell(new PrefixShardCreated(newConfig), noSender());
+        }
+
+        private void resolveDeletedShard(final DataTreeCandidateNode childNode) {
+
+            final MapEntryNode entryNode = (MapEntryNode) childNode.getDataBefore().get();
+
+            final LeafNode<YangInstanceIdentifier> prefix =
+                    (LeafNode<YangInstanceIdentifier>) entryNode.getChild(new NodeIdentifier(SHARD_PREFIX_QNAME)).get();
+
+            final YangInstanceIdentifier deleted = prefix.getValue();
+            LOG.debug("{}: Removing shard at {}.", memberName, deleted);
+
+            final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(type, deleted);
+            final PrefixShardRemoved message = new PrefixShardRemoved(domDataTreeIdentifier);
+
+            handlingActor.tell(message, noSender());
+        }
+
+        private void resolveDelete(final DataTreeCandidateNode rootNode) {
+
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigWriter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigWriter.java
new file mode 100644 (file)
index 0000000..b0507f6
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapEntryNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes and removes prefix-based shards' configuration
+ * to prefix-shard-configuration. This classed is meant to be utilized
+ * by {@link DistributedShardedDOMDataTree} for updating
+ * prefix-shard-configuration upon creating and de-spawning prefix-based shards.
+ */
+class PrefixedShardConfigWriter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrefixedShardConfigWriter.class);
+
+    private final ClientLocalHistory history;
+
+    PrefixedShardConfigWriter(final DataStoreClient client) {
+        history = client.createLocalHistory();
+        writeInitialParent();
+    }
+
+    ListenableFuture<Void> writeConfig(final YangInstanceIdentifier path, final Collection<MemberName> replicas) {
+        LOG.debug("Writing config for {}, replicas {}", path, replicas);
+
+        return doSubmit(doWrite(path, replicas));
+    }
+
+    ListenableFuture<Void> removeConfig(final YangInstanceIdentifier path) {
+        LOG.debug("Removing config for {}.", path);
+
+        return doSubmit(doDelete(path));
+    }
+
+    private void writeInitialParent() {
+        final ClientTransaction tx = history.createTransaction();
+
+        final DOMDataTreeWriteCursor cursor = tx.openCursor();
+
+        final ContainerNode root = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(ClusterUtils.PREFIX_SHARDS_QNAME))
+                .withChild(ImmutableMapNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_LIST_QNAME))
+                        .build())
+                .build();
+
+        cursor.merge(ClusterUtils.PREFIX_SHARDS_PATH.getLastPathArgument(), root);
+        cursor.close();
+
+        final DOMStoreThreePhaseCommitCohort cohort = tx.ready();
+
+        submitBlocking(cohort);
+    }
+
+    private void submitBlocking(final DOMStoreThreePhaseCommitCohort cohort) {
+        try {
+            doSubmit(cohort).get();
+        } catch (final InterruptedException | ExecutionException e) {
+            LOG.error("Unable to write initial shard config parent.", e);
+        }
+    }
+
+    private ListenableFuture<Void> doSubmit(final DOMStoreThreePhaseCommitCohort cohort) {
+        final AsyncFunction<Boolean, Void> validateFunction = input -> cohort.preCommit();
+        final AsyncFunction<Void, Void> prepareFunction = input -> cohort.commit();
+
+        final ListenableFuture<Void> prepareFuture = Futures.transform(cohort.canCommit(), validateFunction);
+        return Futures.transform(prepareFuture, prepareFunction);
+    }
+
+    boolean checkDefaultIsPresent() {
+        final NodeIdentifierWithPredicates pag =
+                new NodeIdentifierWithPredicates(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME,
+                YangInstanceIdentifier.EMPTY);
+
+        final YangInstanceIdentifier defaultId = ClusterUtils.SHARD_LIST_PATH.node(pag);
+
+        final ClientSnapshot snapshot = history.takeSnapshot();
+        try {
+            return snapshot.exists(defaultId).checkedGet();
+        } catch (final ReadFailedException e) {
+            LOG.error("Presence check of default shard in configuration failed.", e);
+            return false;
+        }
+    }
+
+    private DOMStoreThreePhaseCommitCohort doWrite(final YangInstanceIdentifier path,
+                                                   final Collection<MemberName> replicas) {
+
+        final ListNodeBuilder<Object, LeafSetEntryNode<Object>> replicaListBuilder =
+                ImmutableLeafSetNodeBuilder.create().withNodeIdentifier(
+                        new NodeIdentifier(ClusterUtils.SHARD_REPLICA_QNAME));
+
+        replicas.forEach(name -> replicaListBuilder.withChild(
+                ImmutableLeafSetEntryNodeBuilder.create()
+                        .withNodeIdentifier(new NodeWithValue<>(ClusterUtils.SHARD_REPLICA_QNAME, name.getName()))
+                        .withValue(name.getName())
+                        .build()));
+
+        final MapEntryNode newEntry = ImmutableMapEntryNodeBuilder.create()
+                .withNodeIdentifier(
+                        new NodeIdentifierWithPredicates(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME,
+                                path))
+                .withChild(ImmutableLeafNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_PREFIX_QNAME))
+                        .withValue(path)
+                        .build())
+                .withChild(ImmutableContainerNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_REPLICAS_QNAME))
+                        .withChild(replicaListBuilder.build())
+                        .build())
+                .build();
+
+        final ClientTransaction tx = history.createTransaction();
+        final DOMDataTreeWriteCursor cursor = tx.openCursor();
+
+        ClusterUtils.SHARD_LIST_PATH.getPathArguments().forEach(cursor::enter);
+
+        cursor.write(newEntry.getIdentifier(), newEntry);
+        cursor.close();
+
+        return tx.ready();
+    }
+
+    private DOMStoreThreePhaseCommitCohort doDelete(final YangInstanceIdentifier path) {
+
+        final ClientTransaction tx = history.createTransaction();
+        final DOMDataTreeWriteCursor cursor = tx.openCursor();
+
+        ClusterUtils.SHARD_LIST_PATH.getPathArguments().forEach(cursor::enter);
+
+        cursor.delete(
+                new NodeIdentifierWithPredicates(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME, path));
+        cursor.close();
+
+        return tx.ready();
+    }
+}
index e8f3f70..cfbb526 100644 (file)
@@ -16,7 +16,6 @@ import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Status;
-import akka.actor.Status.Failure;
 import akka.actor.Status.Success;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
@@ -27,29 +26,19 @@ import akka.cluster.ClusterEvent.MemberWeaklyUp;
 import akka.cluster.ClusterEvent.ReachableMember;
 import akka.cluster.ClusterEvent.UnreachableMember;
 import akka.cluster.Member;
-import akka.cluster.ddata.DistributedData;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.Replicator;
-import akka.cluster.ddata.Replicator.Changed;
-import akka.cluster.ddata.Replicator.Subscribe;
-import akka.cluster.ddata.Replicator.Update;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -59,14 +48,15 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
@@ -95,7 +85,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
 
     static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
-    static final int LOOKUP_TASK_MAX_RETRIES = 100;
 
     private final DistributedShardedDOMDataTree shardingService;
     private final ActorSystem actorSystem;
@@ -106,14 +95,13 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     private final ShardingServiceAddressResolver resolver;
     private final DistributedDataStore distributedConfigDatastore;
     private final DistributedDataStore distributedOperDatastore;
+    private final int lookupTaskMaxRetries;
 
     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
     private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
 
     private final Cluster cluster;
-    private final ActorRef replicator;
 
-    private ORMap<PrefixShardConfiguration> currentData = ORMap.create();
     private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
 
     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
@@ -124,21 +112,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         clusterWrapper = builder.getClusterWrapper();
         distributedConfigDatastore = builder.getDistributedConfigDatastore();
         distributedOperDatastore = builder.getDistributedOperDatastore();
+        lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
         actorContext = distributedConfigDatastore.getActorContext();
         resolver = new ShardingServiceAddressResolver(
                 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
 
         clusterWrapper.subscribeToMemberEvents(self());
         cluster = Cluster.get(actorSystem);
-
-        replicator = DistributedData.get(context().system()).replicator();
     }
 
     @Override
     public void preStart() {
-        final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
-                new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
-        replicator.tell(subscribe, noSender());
     }
 
     @Override
@@ -148,7 +132,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
 
     @Override
     protected void handleCommand(final Object message) throws Exception {
-        LOG.debug("Received {}", message);
+        LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
         if (message instanceof ClusterEvent.MemberUp) {
             memberUp((ClusterEvent.MemberUp) message);
         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
@@ -161,8 +145,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
             memberUnreachable((ClusterEvent.UnreachableMember) message);
         } else if (message instanceof ClusterEvent.ReachableMember) {
             memberReachable((ClusterEvent.ReachableMember) message);
-        } else if (message instanceof Changed) {
-            onConfigChanged((Changed) message);
         } else if (message instanceof ProducerCreated) {
             onProducerCreated((ProducerCreated) message);
         } else if (message instanceof NotifyProducerCreated) {
@@ -173,51 +155,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
             onNotifyProducerRemoved((NotifyProducerRemoved) message);
         } else if (message instanceof PrefixShardCreated) {
             onPrefixShardCreated((PrefixShardCreated) message);
-        } else if (message instanceof CreatePrefixShard) {
-            onCreatePrefixShard((CreatePrefixShard) message);
-        } else if (message instanceof RemovePrefixShard) {
-            onRemovePrefixShard((RemovePrefixShard) message);
+        } else if (message instanceof LookupPrefixShard) {
+            onLookupPrefixShard((LookupPrefixShard) message);
+        } else if (message instanceof PrefixShardRemovalLookup) {
+            onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
         } else if (message instanceof PrefixShardRemoved) {
             onPrefixShardRemoved((PrefixShardRemoved) message);
+        } else if (message instanceof StartConfigShardLookup) {
+            onStartConfigShardLookup((StartConfigShardLookup) message);
         }
     }
 
-    private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
-        LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change);
-
-        currentData = change.dataValue();
-        final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
-
-        LOG.debug("Changed set {}", changedConfig);
-
-        try {
-            final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
-                    changedConfig.values().stream().collect(
-                            Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity()));
-            resolveConfig(newConfig);
-        } catch (final IllegalStateException e) {
-            LOG.error("Failed, ", e);
-        }
-
-    }
-
-    private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
-
-        // get the removed configurations
-        final SetView<DOMDataTreeIdentifier> deleted =
-                Sets.difference(currentConfiguration.keySet(), newConfig.keySet());
-        shardingService.resolveShardRemovals(deleted);
-
-        // get the added configurations
-        final SetView<DOMDataTreeIdentifier> additions =
-                Sets.difference(newConfig.keySet(), currentConfiguration.keySet());
-        shardingService.resolveShardAdditions(additions);
-        // we can ignore those that existed previously since the potential changes in replicas will be handled by
-        // shard manager.
-
-        currentConfiguration = new HashMap<>(newConfig);
-    }
-
     @Override
     public String persistenceId() {
         return PERSISTENCE_ID;
@@ -368,65 +316,34 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void onCreatePrefixShard(final CreatePrefixShard message) {
-        LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
+    private void onLookupPrefixShard(final LookupPrefixShard message) {
+        LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
 
-        final PrefixShardConfiguration configuration = message.getConfiguration();
+        final DOMDataTreeIdentifier prefix = message.getPrefix();
 
-        final Update<ORMap<PrefixShardConfiguration>> update =
-                new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
-                    map -> map.put(cluster, configuration.toDataMapKey(), configuration));
-
-        replicator.tell(update, self());
-
-        final ActorContext context =
-                configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION
+        final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
                         ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
 
         // schedule a notification task for the reply
         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
                 new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
-                        context, shardingService, configuration.getPrefix()),
-                actorSystem.dispatcher());
+                        context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
     }
 
     private void onPrefixShardCreated(final PrefixShardCreated message) {
         LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
 
-        final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
-        final ActorRef sender = getSender();
-
-        final List<CompletableFuture<Object>> futures = new ArrayList<>();
+        final PrefixShardConfiguration config = message.getConfiguration();
 
-        for (final String address : addresses) {
-            final ActorSelection actorSelection = actorSystem.actorSelection(address);
-            futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
-                    new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
-        }
-
-        final CompletableFuture<Void> combinedFuture =
-                CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
-
-        combinedFuture.thenRun(() -> {
-            sender.tell(new Status.Success(null), self());
-        }).exceptionally(throwable -> {
-            sender.tell(new Status.Failure(throwable), self());
-            return null;
-        });
+        shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
     }
 
-    private void onRemovePrefixShard(final RemovePrefixShard message) {
-        LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
-
-        //TODO the removal message should have the configuration or some other way to get to the key
-        final Update<ORMap<PrefixShardConfiguration>> removal =
-                new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
-                    map -> map.remove(cluster, "prefix=" + message.getPrefix()));
-        replicator.tell(removal, self());
+    private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
+        LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
 
         final ShardRemovalLookupTask removalTask =
                 new ShardRemovalLookupTask(actorSystem, getSender(),
-                        actorContext, message.getPrefix());
+                        actorContext, message.getPrefix(), lookupTaskMaxRetries);
 
         actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
     }
@@ -434,15 +351,21 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
         LOG.debug("Received PrefixShardRemoved: {}", message);
 
-        final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
+        shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
+    }
 
-        if (registration == null) {
-            LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
-                    message.getPrefix(), idToShardRegistration);
-            return;
-        }
+    private void onStartConfigShardLookup(final StartConfigShardLookup message) {
+        LOG.debug("Received StartConfigShardLookup: {}", message);
 
-        registration.close();
+        final ActorContext context =
+                message.getType().equals(LogicalDatastoreType.CONFIGURATION)
+                        ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
+
+        // schedule a notification task for the reply
+        actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
+                new ConfigShardLookupTask(
+                        actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries),
+                actorSystem.dispatcher());
     }
 
     private static MemberName memberToName(final Member member) {
@@ -486,39 +409,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private abstract static class LookupTask implements Runnable {
-
-        private final ActorRef replyTo;
-        private int retries = 0;
-
-        private LookupTask(final ActorRef replyTo) {
-            this.replyTo = replyTo;
-        }
-
-        abstract void reschedule(int retries);
-
-        void tryReschedule(@Nullable final Throwable throwable) {
-            if (retries <= LOOKUP_TASK_MAX_RETRIES) {
-                retries++;
-                reschedule(retries);
-            } else {
-                fail(throwable);
-            }
-        }
-
-        void fail(@Nullable final Throwable throwable) {
-            if (throwable == null) {
-                replyTo.tell(new Failure(
-                        new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
-                                + "Failing..")), noSender());
-            } else {
-                replyTo.tell(new Failure(
-                        new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
-                                + "Failing..", throwable)), noSender());
-            }
-        }
-    }
-
     /**
      * Handles the lookup step of cds shard creation once the configuration is updated.
      */
@@ -530,20 +420,23 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         private final ActorContext context;
         private final DistributedShardedDOMDataTree shardingService;
         private final DOMDataTreeIdentifier toLookup;
+        private final int lookupMaxRetries;
 
         ShardCreationLookupTask(final ActorSystem system,
                                 final ActorRef replyTo,
                                 final ClusterWrapper clusterWrapper,
                                 final ActorContext context,
                                 final DistributedShardedDOMDataTree shardingService,
-                                final DOMDataTreeIdentifier toLookup) {
-            super(replyTo);
+                                final DOMDataTreeIdentifier toLookup,
+                                final int lookupMaxRetries) {
+            super(replyTo, lookupMaxRetries);
             this.system = system;
             this.replyTo = replyTo;
             this.clusterWrapper = clusterWrapper;
             this.context = context;
             this.shardingService = shardingService;
             this.toLookup = toLookup;
+            this.lookupMaxRetries = lookupMaxRetries;
         }
 
         @Override
@@ -562,7 +455,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
                         system.scheduler().scheduleOnce(
                                 SHARD_LOOKUP_TASK_INTERVAL,
                                 new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
-                                        shardingService, toLookup),
+                                        shardingService, toLookup, lookupMaxRetries),
                                 system.dispatcher());
                     }
                 }
@@ -589,6 +482,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         private final ActorRef shard;
         private final DistributedShardedDOMDataTree shardingService;
         private final DOMDataTreeIdentifier toLookup;
+        private final int lookupMaxRetries;
 
         ShardLeaderLookupTask(final ActorSystem system,
                               final ActorRef replyTo,
@@ -596,8 +490,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
                               final ClusterWrapper clusterWrapper,
                               final ActorRef shard,
                               final DistributedShardedDOMDataTree shardingService,
-                              final DOMDataTreeIdentifier toLookup) {
-            super(replyTo);
+                              final DOMDataTreeIdentifier toLookup,
+                              final int lookupMaxRetries) {
+            super(replyTo, lookupMaxRetries);
             this.system = system;
             this.replyTo = replyTo;
             this.context = context;
@@ -605,6 +500,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
             this.shard = shard;
             this.shardingService = shardingService;
             this.toLookup = toLookup;
+            this.lookupMaxRetries = lookupMaxRetries;
         }
 
         @Override
@@ -626,7 +522,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
                                     clusterWrapper.getCurrentMemberName(), toLookup);
                             system.scheduler().scheduleOnce(
                                     SHARD_LOOKUP_TASK_INTERVAL,
-                                    new FrontendLookupTask(system, replyTo, shardingService, toLookup),
+                                    new FrontendLookupTask(
+                                            system, replyTo, shardingService, toLookup, lookupMaxRetries),
                                     system.dispatcher());
                         } else {
                             tryReschedule(null);
@@ -661,8 +558,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         FrontendLookupTask(final ActorSystem system,
                            final ActorRef replyTo,
                            final DistributedShardedDOMDataTree shardingService,
-                           final DOMDataTreeIdentifier toLookup) {
-            super(replyTo);
+                           final DOMDataTreeIdentifier toLookup,
+                           final int lookupMaxRetries) {
+            super(replyTo, lookupMaxRetries);
             this.system = system;
             this.replyTo = replyTo;
             this.shardingService = shardingService;
@@ -720,8 +618,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         ShardRemovalLookupTask(final ActorSystem system,
                                final ActorRef replyTo,
                                final ActorContext context,
-                               final DOMDataTreeIdentifier toLookup) {
-            super(replyTo);
+                               final DOMDataTreeIdentifier toLookup,
+                               final int lookupMaxRetries) {
+            super(replyTo, lookupMaxRetries);
             this.system = system;
             this.replyTo = replyTo;
             this.context = context;
@@ -757,6 +656,115 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    /**
+     * Task for handling the lookup of the backend for the configuration shard.
+     */
+    private static class ConfigShardLookupTask extends LookupTask {
+
+        private final ActorSystem system;
+        private final ActorRef replyTo;
+        private final ActorContext context;
+        private final ClusterWrapper clusterWrapper;
+        private final int lookupTaskMaxRetries;
+
+        ConfigShardLookupTask(final ActorSystem system,
+                              final ActorRef replyTo,
+                              final ActorContext context,
+                              final ClusterWrapper clusterWrapper,
+                              final StartConfigShardLookup message,
+                              final int lookupMaxRetries) {
+            super(replyTo, lookupMaxRetries);
+            this.system = system;
+            this.replyTo = replyTo;
+            this.context = context;
+            this.clusterWrapper = clusterWrapper;
+            this.lookupTaskMaxRetries = lookupMaxRetries;
+        }
+
+        @Override
+        void reschedule(int retries) {
+            LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
+            system.scheduler().scheduleOnce(
+                    SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
+        }
+
+        @Override
+        public void run() {
+            final Optional<ActorRef> localShard =
+                    context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
+
+            if (!localShard.isPresent()) {
+                tryReschedule(null);
+            } else {
+                LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup..");
+                system.scheduler().scheduleOnce(
+                        SHARD_LOOKUP_TASK_INTERVAL,
+                        new ConfigShardReadinessTask(
+                                system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries),
+                        system.dispatcher());
+            }
+        }
+    }
+
+    /**
+     * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
+     */
+    private static class ConfigShardReadinessTask extends LookupTask {
+
+        private final ActorSystem system;
+        private final ActorRef replyTo;
+        private final ActorContext context;
+        private final ClusterWrapper clusterWrapper;
+        private final ActorRef shard;
+
+        ConfigShardReadinessTask(final ActorSystem system,
+                                 final ActorRef replyTo,
+                                 final ActorContext context,
+                                 final ClusterWrapper clusterWrapper,
+                                 final ActorRef shard,
+                                 final int lookupMaxRetries) {
+            super(replyTo, lookupMaxRetries);
+            this.system = system;
+            this.replyTo = replyTo;
+            this.context = context;
+            this.clusterWrapper = clusterWrapper;
+            this.shard = shard;
+        }
+
+        @Override
+        void reschedule(int retries) {
+            LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
+                    clusterWrapper.getCurrentMemberName(), retries);
+            system.scheduler().scheduleOnce(
+                    SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
+        }
+
+        @Override
+        public void run() {
+            final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
+
+            ask.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
+                    if (throwable != null) {
+                        tryReschedule(throwable);
+                    } else {
+                        final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
+                        final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
+                        if (leaderActor.isPresent()) {
+                            // leader is found, backend seems ready, check if the frontend is ready
+                            LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
+                                    clusterWrapper.getCurrentMemberName());
+                            replyTo.tell(new Status.Success(null), noSender());
+                        } else {
+                            tryReschedule(null);
+                        }
+                    }
+                }
+            }, system.dispatcher());
+        }
+    }
+
     public static class ShardedDataTreeActorCreator {
 
         private DistributedShardedDOMDataTree shardingService;
@@ -764,6 +772,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         private DistributedDataStore distributedOperDatastore;
         private ActorSystem actorSystem;
         private ClusterWrapper cluster;
+        private int maxRetries;
 
         public DistributedShardedDOMDataTree getShardingService() {
             return shardingService;
@@ -812,6 +821,15 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
             return this;
         }
 
+        public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) {
+            this.maxRetries = maxRetries;
+            return this;
+        }
+
+        public int getLookupTaskMaxRetries() {
+            return maxRetries;
+        }
+
         private void verify() {
             Preconditions.checkNotNull(shardingService);
             Preconditions.checkNotNull(actorSystem);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/InitConfigListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/InitConfigListener.java
new file mode 100644 (file)
index 0000000..bec9765
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+/**
+ * Message sent to the local ShardManager, once the shard configuration shard is ready and the ShardManager should
+ * start its listener.
+ */
+public class InitConfigListener {
+
+    public static final InitConfigListener INSTANCE = new InitConfigListener();
+
+    private InitConfigListener() {
+
+    }
+}
@@ -11,8 +11,8 @@ package org.opendaylight.controller.cluster.sharding.messages;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import java.io.Serializable;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 
 /**
  * Sent to the local {@link ShardedDataTreeActor} when there was a shard created
@@ -20,25 +20,25 @@ import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
  * create the required frontend/backend shards.
  */
 @Beta
-public class CreatePrefixShard implements Serializable {
+public class LookupPrefixShard implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final PrefixShardConfiguration configuration;
+    private final DOMDataTreeIdentifier prefix;
 
-    public CreatePrefixShard(final PrefixShardConfiguration configuration) {
-        this.configuration = Preconditions.checkNotNull(configuration);
+    public LookupPrefixShard(final DOMDataTreeIdentifier prefix) {
+        this.prefix = Preconditions.checkNotNull(prefix);
     }
 
-    public PrefixShardConfiguration getConfiguration() {
-        return configuration;
+    public DOMDataTreeIdentifier getPrefix() {
+        return prefix;
     }
 
 
     @Override
     public String toString() {
-        return "CreatePrefixShard{"
-                + "configuration="
-                + configuration
+        return "LookupPrefixShard{"
+                + "prefix="
+                + prefix
                 + '}';
     }
 }
@@ -13,15 +13,14 @@ import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 
 /**
- * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node.
- * The local actor should update the configuration so that the change is picked up by other CDS Node Agents and
- * backend ShardManagers.
+ * Sent to the local {@link ShardedDataTreeActor} to initiate the lookup of the shard, once the shard is removed from
+ * the system entirely the actor responds with a success.
  */
-public class RemovePrefixShard {
+public class PrefixShardRemovalLookup {
 
     private final DOMDataTreeIdentifier prefix;
 
-    public RemovePrefixShard(final DOMDataTreeIdentifier prefix) {
+    public PrefixShardRemovalLookup(final DOMDataTreeIdentifier prefix) {
 
         this.prefix = Preconditions.checkNotNull(prefix);
     }
@@ -32,7 +31,7 @@ public class RemovePrefixShard {
 
     @Override
     public String toString() {
-        return "RemovePrefixShard{"
+        return "PrefixShardRemovalLookup{"
                 + "prefix=" + prefix
                 + '}';
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/StartConfigShardLookup.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/StartConfigShardLookup.java
new file mode 100644 (file)
index 0000000..22e5dbf
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+
+/**
+ * Message that should be sent to ShardedDataTreeActor when the lookup of the prefix config shard should begin.
+ * Replied to with Succes once the shard has a leader.
+ */
+public class StartConfigShardLookup {
+
+    private LogicalDatastoreType type;
+
+    public StartConfigShardLookup(final LogicalDatastoreType type) {
+        this.type = type;
+    }
+
+    public LogicalDatastoreType getType() {
+        return type;
+    }
+
+    @Override
+    public String toString() {
+        return "StartConfigShardLookup{type=" + type + '}';
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/prefix-shard-configuration.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/prefix-shard-configuration.yang
new file mode 100644 (file)
index 0000000..02d5c30
--- /dev/null
@@ -0,0 +1,34 @@
+module prefix-shard-configuration {
+    yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:clustering:prefix-shard-configuration";
+    prefix "prefix-config";
+
+    description
+        "This module contains the base YANG definitions for
+        shards based on prefix configuration";
+
+    revision "2017-01-10" {
+        description "Initial revision.";
+    }
+
+    container prefix-shards {
+
+        list shard {
+            key prefix;
+            leaf prefix {
+                type instance-identifier;
+                description "Prefix that this shard is rooted at.";
+            }
+
+            container replicas {
+                leaf-list replica {
+                    type string;
+                }
+
+                description "List of cluster member nodes that this shard is replicated on";
+            }
+
+            description "List of prefix-based shards configured.";
+        }
+    }
+}
index 80cc2e7..fc6445e 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.mockito.Mockito.mock;
 import static org.mockito.MockitoAnnotations.initMocks;
 
 import akka.actor.ActorRef;
@@ -51,11 +52,12 @@ public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
     protected static CountDownLatch ready;
 
     protected TestShardManager.Builder newTestShardMgrBuilder() {
-        return TestShardManager.builder(datastoreContextBuilder);
+        return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class));
     }
 
     protected TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
-        return TestShardManager.builder(datastoreContextBuilder).configuration(config);
+        return TestShardManager.builder(datastoreContextBuilder).configuration(config)
+                .distributedDataStore(mock(DistributedDataStore.class));
     }
 
     protected Props newShardMgrProps(final Configuration config) {
index b336365..86c8759 100644 (file)
@@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
@@ -84,8 +85,15 @@ public class IntegrationTestKit extends ShardTestKit {
 
     public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
             final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) {
+        return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader,
+                schemaContext, shardNames);
+    }
+
+    public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+                                                       final String modulesConfig, final boolean waitUntilLeader,
+                                                       final SchemaContext schemaContext, final String... shardNames) {
         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
-        final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
+        final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
 
         datastoreContextBuilder.dataStoreName(typeName);
 
@@ -129,6 +137,30 @@ public class IntegrationTestKit extends ShardTestKit {
         return dataStore;
     }
 
+    public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
+                                                                       final SchemaContext schemaContext,
+                                                                       final LogicalDatastoreType storeType) {
+        final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
+        final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
+
+        getDatastoreContextBuilder().dataStoreName(typeName);
+
+        final DatastoreContext datastoreContext =
+                getDatastoreContextBuilder().logicalStoreType(storeType).build();
+
+        final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+
+        final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
+                configuration, mockContextFactory, restoreFromSnapshot);
+
+        dataStore.onGlobalContextUpdated(schemaContext);
+
+        datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
+        return dataStore;
+    }
+
     public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) {
         for (String shardName: shardNames) {
             ActorRef shard = findLocalShard(actorContext, shardName);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java
deleted file mode 100644 (file)
index 289c6b7..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.Status.Success;
-import akka.testkit.JavaTestKit;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.Shard.Builder;
-import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
-import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tests prefix shard creation in ShardManager.
- */
-public class PrefixShardCreationTest extends AbstractShardManagerTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PrefixShardCreationTest.class);
-
-    private static final DOMDataTreeIdentifier TEST_ID =
-            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
-
-    @Test
-    public void testPrefixShardCreation() throws Exception {
-
-        LOG.info("testPrefixShardCreation starting");
-        new JavaTestKit(getSystem()) {
-            {
-                datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
-
-                final ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
-                        new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
-
-                final SchemaContext schemaContext = TestModel.createTestContext();
-                shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
-
-                shardManager.tell(new FindLocalShard(
-                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
-
-                final Builder builder = Shard.builder();
-
-                final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
-                        new PrefixShardConfiguration(TEST_ID,
-                                PrefixShardStrategy.NAME,
-                                Collections.singletonList(MEMBER_1)),
-                        datastoreContextBuilder.build(), builder);
-
-                shardManager.tell(createPrefixedShard, getRef());
-                expectMsgClass(duration("5 seconds"), Success.class);
-
-                shardManager.tell(new FindLocalShard(
-                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-            }
-        };
-    }
-}
\ No newline at end of file
index c6cc641..6a8830c 100644 (file)
@@ -67,6 +67,7 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
@@ -168,7 +169,8 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     }
 
     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
-        return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
+        return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
+                .distributedDataStore(mock(DistributedDataStore.class));
     }
 
 
index 8564e2d..df870f5 100644 (file)
@@ -19,9 +19,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
-import akka.actor.PoisonPill;
 import akka.cluster.Cluster;
-import akka.cluster.ddata.DistributedData;
 import akka.testkit.JavaTestKit;
 import com.google.common.collect.Lists;
 import com.typesafe.config.ConfigFactory;
@@ -29,7 +27,6 @@ import java.util.Collections;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
@@ -39,6 +36,8 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -54,7 +53,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLe
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Ignore("Needs to have the configuration backend switched from distributed-data")
 public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
@@ -65,6 +63,8 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
     private static final DOMDataTreeIdentifier TEST_ID =
             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
 
+    private static final String MODULE_SHARDS_CONFIG = "module-shards-cars-member-1-and-2.conf";
+
     private ActorSystem leaderSystem;
     private ActorSystem followerSystem;
 
@@ -79,19 +79,25 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
                     .logicalStoreType(
                             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
 
-    private DistributedDataStore followerDistributedDataStore;
-    private DistributedDataStore leaderDistributedDataStore;
+    private DistributedDataStore leaderConfigDatastore;
+    private DistributedDataStore leaderOperDatastore;
+
+    private DistributedDataStore followerConfigDatastore;
+    private DistributedDataStore followerOperDatastore;
+
+
     private IntegrationTestKit followerTestKit;
     private IntegrationTestKit leaderTestKit;
-
     private DistributedShardedDOMDataTree leaderShardFactory;
-    private DistributedShardedDOMDataTree followerShardFactory;
 
+    private DistributedShardedDOMDataTree followerShardFactory;
     private ActorSystemProvider leaderSystemProvider;
     private ActorSystemProvider followerSystemProvider;
 
     @Before
     public void setUp() {
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
 
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
@@ -109,45 +115,67 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
 
     @After
     public void tearDown() {
-        if (followerDistributedDataStore != null) {
-            followerDistributedDataStore.close();
+        if (leaderConfigDatastore != null) {
+            leaderConfigDatastore.close();
         }
-        if (leaderDistributedDataStore != null) {
-            leaderDistributedDataStore.close();
+        if (leaderOperDatastore != null) {
+            leaderOperDatastore.close();
         }
 
-        DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
-        DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
+        if (followerConfigDatastore != null) {
+            followerConfigDatastore.close();
+        }
+        if (followerOperDatastore != null) {
+            followerOperDatastore.close();
+        }
 
         JavaTestKit.shutdownActorSystem(leaderSystem);
         JavaTestKit.shutdownActorSystem(followerSystem);
+
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
     }
 
-    private void initEmptyDatastores(final String type) {
+    private void initEmptyDatastores() {
         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
 
-        leaderDistributedDataStore =
-                leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+        leaderConfigDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore(
+                "config", MODULE_SHARDS_CONFIG, true,
+                SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+        leaderOperDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore(
+                "operational", MODULE_SHARDS_CONFIG, true,
+                SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+
+        leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
+                leaderOperDatastore,
+                leaderConfigDatastore);
 
         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
-        followerDistributedDataStore =
-                followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
 
-        leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
-                leaderDistributedDataStore,
-                leaderDistributedDataStore);
+        followerConfigDatastore = (DistributedDataStore) followerTestKit.setupDistributedDataStore(
+                "config", MODULE_SHARDS_CONFIG, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+        followerOperDatastore = (DistributedDataStore) followerTestKit.setupDistributedDataStore(
+                "operational", MODULE_SHARDS_CONFIG, true,
+                SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
 
         followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
-                followerDistributedDataStore,
-                followerDistributedDataStore);
+                followerOperDatastore,
+                followerConfigDatastore);
+
+        leaderShardFactory.init();
+        followerShardFactory.init();
+
+        leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
+                ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
 
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+        leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
+
     }
 
     @Test
     public void testProducerRegistrations() throws Exception {
-        initEmptyDatastores("config");
+        initEmptyDatastores();
 
         leaderTestKit.waitForMembersUp("member-2");
 
@@ -156,15 +184,15 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
                         TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
                         DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+        leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
 
-        final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+        final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager();
 
-        assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+        assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
 
-        assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+        assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
 
         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
@@ -196,11 +224,13 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
         } catch (final DOMDataTreeShardingConflictException e) {
             assertTrue(e.getMessage().contains("is already occupied by another shard"));
         }
+
+        shardRegistration.close().toCompletableFuture().get();
     }
 
     @Test
     public void testWriteIntoMultipleShards() throws Exception {
-        initEmptyDatastores("config");
+        initEmptyDatastores();
 
         leaderTestKit.waitForMembersUp("member-2");
 
@@ -211,9 +241,9 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
                         DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
 
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+        leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
-        findLocalShard(followerDistributedDataStore.getActorContext(),
+        findLocalShard(followerConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
 
         LOG.debug("Got after waiting for nonleader");
@@ -232,11 +262,13 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
         LOG.warn("Got to pre submit");
 
         tx.submit().checkedGet();
+
+        shardRegistration.close().toCompletableFuture().get();
     }
 
     @Test
     public void testMultipleShardRegistrations() throws Exception {
-        initEmptyDatastores("config");
+        initEmptyDatastores();
 
         final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
                 TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
@@ -257,73 +289,73 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
                 Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
                 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+        leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+        leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+        leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+        leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
 
         // check leader has local shards
-        assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+        assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
 
-        assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+        assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
 
-        assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+        assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
 
-        assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+        assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
 
         // check follower has local shards
-        assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+        assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
 
-        assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+        assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
 
-        assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+        assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
 
-        assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+        assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
 
 
         LOG.debug("Closing registrations");
 
-        reg1.close();
-        reg2.close();
-        reg3.close();
-        reg4.close();
+        reg1.close().toCompletableFuture().get();
+        reg2.close().toCompletableFuture().get();
+        reg3.close().toCompletableFuture().get();
+        reg4.close().toCompletableFuture().get();
 
-        waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+        waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
 
-        waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+        waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
 
-        waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+        waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
 
-        waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+        waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
 
         LOG.debug("All leader shards gone");
 
-        waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+        waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
 
-        waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+        waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
 
-        waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+        waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
 
-        waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+        waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
                 ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
 
         LOG.debug("All follower shards gone");
@@ -331,7 +363,7 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
 
     @Test
     public void testMultipleRegistrationsAtOnePrefix() throws Exception {
-        initEmptyDatastores("config");
+        initEmptyDatastores();
 
         for (int i = 0; i < 10; i++) {
             LOG.debug("Round {}", i);
@@ -339,22 +371,23 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
                     TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
                     DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-            leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+            leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
 
-            assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+            assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
 
-            assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+            assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
 
             waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-            waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+            waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
 
-            waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+            waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
                     ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
         }
+
     }
 }
index 91435be..33a8e59 100644 (file)
@@ -55,6 +55,8 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -82,7 +84,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMa
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Ignore("distributed-data is broken needs to be removed")
 public class DistributedShardedDOMDataTreeTest extends AbstractTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
@@ -99,6 +100,8 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
                             .node(TestModel.INNER_LIST_QNAME));
     private static final Set<MemberName> SINGLE_MEMBER = Collections.singleton(AbstractTest.MEMBER_NAME);
 
+    private static final String MODULE_SHARDS_CONFIG = "module-shards-cars-member-1.conf";
+
     private ActorSystem leaderSystem;
 
     private final Builder leaderDatastoreContextBuilder =
@@ -109,6 +112,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
                             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
 
     private DistributedDataStore leaderDistributedDataStore;
+    private DistributedDataStore operDistributedDatastore;
     private IntegrationTestKit leaderTestKit;
 
     private DistributedShardedDOMDataTree leaderShardFactory;
@@ -124,6 +128,9 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
 
@@ -132,30 +139,43 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
     }
 
     @After
-    public void tearDown() {
+    public void tearDown() throws Exception {
         if (leaderDistributedDataStore != null) {
             leaderDistributedDataStore.close();
         }
 
+        if (operDistributedDatastore != null) {
+            operDistributedDatastore.close();
+        }
+
         JavaTestKit.shutdownActorSystem(leaderSystem);
+
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
     }
 
-    private void initEmptyDatastore(final String type) {
+    private void initEmptyDatastores() {
         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
 
-        leaderDistributedDataStore =
-                leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+        leaderDistributedDataStore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore(
+                "config", MODULE_SHARDS_CONFIG, "empty-modules.conf", true,
+                SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
 
+        operDistributedDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore(
+                "operational", MODULE_SHARDS_CONFIG, "empty-modules.conf",true,
+                SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
 
         leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
-                leaderDistributedDataStore,
+                operDistributedDatastore,
                 leaderDistributedDataStore);
+
+        leaderShardFactory.init();
     }
 
 
     @Test
     public void testWritesIntoDefaultShard() throws Exception {
-        initEmptyDatastore("config");
+        initEmptyDatastores();
 
         final DOMDataTreeIdentifier configRoot =
                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
@@ -180,7 +200,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
 
     @Test
     public void testSingleNodeWrites() throws Exception {
-        initEmptyDatastore("config");
+        initEmptyDatastores();
 
         final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
                 leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
@@ -198,6 +218,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
                 YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
         final LeafNode<String> valueToCheck = ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
                 new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build();
+        LOG.debug("Writing data {} at {}, cursor {}", nameId.getLastPathArgument(), valueToCheck, cursor);
         cursor.write(nameId.getLastPathArgument(),
                 valueToCheck);
 
@@ -225,11 +246,13 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
 
         verifyNoMoreInteractions(mockedDataTreeListener);
 
+        shardRegistration.close().toCompletableFuture().get();
+
     }
 
     @Test
     public void testMultipleWritesIntoSingleMapEntry() throws Exception {
-        initEmptyDatastore("config");
+        initEmptyDatastores();
 
         final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
                 leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
@@ -312,11 +335,12 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
     }
 
     // top level shard at TEST element, with subshards on each outer-list map entry
+    @Ignore("https://bugs.opendaylight.org/show_bug.cgi?id=8116")
     @Test
     public void testMultipleShardLevels() throws Exception {
-        initEmptyDatastore("config");
+        initEmptyDatastores();
 
-        final DistributedShardRegistration testShardId = waitOnAsyncTask(
+        final DistributedShardRegistration testShardReg = waitOnAsyncTask(
                 leaderShardFactory.createDistributedShard(TEST_ID, SINGLE_MEMBER),
                 DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
@@ -374,7 +398,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
                 true, Collections.emptyList());
 
         // need 6 invocations, first initial thats from the parent shard, and then each individual subshard
-        verify(mockedDataTreeListener, timeout(10000).times(6)).onDataTreeChanged(captorForChanges.capture(),
+        verify(mockedDataTreeListener, timeout(20000).times(6)).onDataTreeChanged(captorForChanges.capture(),
                 captorForSubtrees.capture());
         verifyNoMoreInteractions(mockedDataTreeListener);
         final List<Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>>> allSubtrees = captorForSubtrees.getAllValues();
@@ -391,49 +415,19 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
                                 .withValue(createOuterEntries(listSize, "testing-values")).build())
                         .build();
 
-        assertEquals(expected, actual);
-    }
-
-    @Test
-    public void testDistributedData() throws Exception {
-        initEmptyDatastore("config");
-
-        waitOnAsyncTask(
-                leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
-                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
-        waitOnAsyncTask(
-                leaderShardFactory.createDistributedShard(
-                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME)),
-                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
-        waitOnAsyncTask(
-                leaderShardFactory.createDistributedShard(
-                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME)),
-                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+        for (final DistributedShardRegistration registration : registrations) {
+            waitOnAsyncTask(registration.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+        }
 
-        waitOnAsyncTask(
-                leaderShardFactory.createDistributedShard(
-                        new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
-                        Lists.newArrayList(AbstractTest.MEMBER_NAME)),
-                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
-                ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
-                ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
-                ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
-                ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+        waitOnAsyncTask(testShardReg.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
+        assertEquals(expected, actual);
     }
 
     @Test
     public void testMultipleRegistrationsAtOnePrefix() throws Exception {
-        initEmptyDatastore("config");
+        initEmptyDatastores();
 
         for (int i = 0; i < 10; i++) {
             LOG.debug("Round {}", i);
index 164f363..4e906a4 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.md.cluster.datastore.model;
 
 import com.google.common.base.Throwables;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -46,6 +48,19 @@ public class SchemaContextHelper {
         }
     }
 
+    public static SchemaContext distributedShardedDOMDataTreeSchemaContext() {
+        final List<InputStream> streams = new ArrayList<>();
+        try {
+            // we need prefix-shard-configuration and odl-datastore-test models
+            // for DistributedShardedDOMDataTree tests
+            streams.add(getInputStream(ODL_DATASTORE_TEST_YANG));
+            streams.add(new FileInputStream("src/main/yang/prefix-shard-configuration.yang"));
+            return YangParserTestUtils.parseYangStreams(streams);
+        } catch (FileNotFoundException | ReactorException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static SchemaContext entityOwners() {
         try {
             return YangParserTestUtils.parseYangSources(new File("src/main/yang/entity-owners.yang"));
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/empty-modules.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/empty-modules.conf
new file mode 100644 (file)
index 0000000..14c0916
--- /dev/null
@@ -0,0 +1 @@
+modules = []
\ No newline at end of file

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.