BUG 2138: Introduce DistributedShardedDOMDataTree 43/44943/61
authorTomas Cere <tcere@cisco.com>
Wed, 31 Aug 2016 15:22:30 +0000 (17:22 +0200)
committerTomas Cere <tcere@cisco.com>
Mon, 2 Jan 2017 11:48:33 +0000 (12:48 +0100)
This is the initial patch that introduces the concept of
prefix based shards and consumer/producer api's into the
DistributedDatastore.

Each shard has a frontend registered into the ShardedDOMDataTree
that forwards calls into the DistributedDataStoreClient frontend
which then handles the actual writes into the Shard.
These ShardFrontends are then distributed into other nodes
via ShardedDataTreeActor which also handles notification of
other nodes of open Producers.

Change-Id: Ifcbd1021fdaeac7929fc547e6e32be50da9d93fc
Signed-off-by: Tomas Cere <tcere@cisco.com>
Signed-off-by: Robert Varga <rovarga@cisco.com>
18 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DOMDataTreeShardCreationFailedException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFactory.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedShardFactory.java with 92% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardFrontend.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardingServiceAddressResolver.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerCreated.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerRemoved.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerCreated.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerRemoved.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DOMDataTreeShardCreationFailedException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DOMDataTreeShardCreationFailedException.java
new file mode 100644 (file)
index 0000000..f6628ad
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.annotations.Beta;
+import javax.annotation.Nonnull;
+
+/**
+ * Exception thrown when there was a at any point during the creation of a shard via {@link DistributedShardFactory}.
+ */
+@Beta
+public class DOMDataTreeShardCreationFailedException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public DOMDataTreeShardCreationFailedException(final @Nonnull String message) {
+        super(message);
+    }
+
+    public DOMDataTreeShardCreationFailedException(final @Nonnull String message, final @Nonnull Throwable cause) {
+        super(message, cause);
+    }
+}
@@ -6,8 +6,9 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.datastore;
+package org.opendaylight.controller.cluster.sharding;
 
+import com.google.common.annotations.Beta;
 import java.util.Collection;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
@@ -20,6 +21,7 @@ import org.opendaylight.yangtools.concepts.Registration;
  * all the boilerplate that comes with registration of a new clustered shard into the system and creating the backend
  * shard/replicas that come along with it.
  */
+@Beta
 public interface DistributedShardFactory {
 
     /**
@@ -36,7 +38,8 @@ public interface DistributedShardFactory {
      */
     DistributedShardRegistration createDistributedShard(DOMDataTreeIdentifier prefix,
                                                         Collection<MemberName> replicaMembers)
-            throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException;
+            throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException,
+            DOMDataTreeShardCreationFailedException;
 
     interface DistributedShardRegistration extends Registration {
         @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java
new file mode 100644 (file)
index 0000000..91b479d
--- /dev/null
@@ -0,0 +1,318 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
+import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+/**
+ * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
+ * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system.
+ */
+public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService,
+        DistributedShardFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
+
+    private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
+    private static final int MAX_ACTOR_CREATION_RETRIES = 100;
+    private static final int ACTOR_RETRY_DELAY = 100;
+    private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
+
+    static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
+
+    private final ShardedDOMDataTree shardedDOMDataTree;
+    private final ActorSystem actorSystem;
+    private final DistributedDataStore distributedOperDatastore;
+    private final DistributedDataStore distributedConfigDatastore;
+
+    private final ActorRef shardedDataTreeActor;
+    private final MemberName memberName;
+
+    public DistributedShardedDOMDataTree(final ActorSystem actorSystem,
+                                         final DistributedDataStore distributedOperDatastore,
+                                         final DistributedDataStore distributedConfigDatastore) {
+        this.actorSystem = Preconditions.checkNotNull(actorSystem);
+        this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore);
+        this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore);
+        shardedDOMDataTree = new ShardedDOMDataTree();
+
+        shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
+                new ShardedDataTreeActorCreator()
+                        .setDataTreeService(shardedDOMDataTree)
+                        .setShardingService(shardedDOMDataTree)
+                        .setActorSystem(actorSystem)
+                        .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
+                        .setDistributedConfigDatastore(distributedConfigDatastore)
+                        .setDistributedOperDatastore(distributedOperDatastore),
+                ACTOR_ID);
+
+        this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
+    }
+
+    @Nonnull
+    @Override
+    public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
+            final T listener, final Collection<DOMDataTreeIdentifier> subtrees,
+            final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers)
+            throws DOMDataTreeLoopException {
+
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    @Nonnull
+    @Override
+    public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
+        LOG.debug("Creating producer for {}", subtrees);
+        final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
+
+        final Object response = distributedConfigDatastore.getActorContext()
+                .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
+        if (response == null) {
+            LOG.debug("Received success from remote nodes, creating producer:{}", subtrees);
+            return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
+                    distributedConfigDatastore.getActorContext());
+        } else if (response instanceof Exception) {
+            closeProducer(producer);
+            throw Throwables.propagate((Exception) response);
+        } else {
+            closeProducer(producer);
+            throw new RuntimeException("Unexpected response to create producer received." + response);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public DistributedShardRegistration createDistributedShard(
+            final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
+            throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException,
+            DOMDataTreeShardCreationFailedException {
+
+        final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
+        final DistributedDataStore distributedDataStore =
+                prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
+                        ? distributedConfigDatastore : distributedOperDatastore;
+
+        final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
+        if (replicaMembers.contains(memberName)) {
+            // spawn the backend shard and have the shard Manager create all replicas
+            final ActorRef shardManager = distributedDataStore.getActorContext().getShardManager();
+
+            shardManager.tell(new CreatePrefixedShard(config, null, Shard.builder()), noSender());
+        }
+
+        LOG.debug("Creating distributed datastore client for shard {}", shardName);
+        final Props distributedDataStoreClientProps =
+                SimpleDataStoreClientActor
+                        .props(memberName, "Shard-" + shardName, distributedDataStore.getActorContext(), shardName);
+
+        final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+        final DataStoreClient client;
+        try {
+            client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (final Exception e) {
+            LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), noSender());
+            throw new DOMDataTreeProducerException("Unable to create producer", e);
+        }
+
+        // register the frontend into the sharding service and let the actor distribute this onto the other nodes
+        final ListenerRegistration<ShardFrontend> shardFrontendRegistration;
+        try (DOMDataTreeProducer producer = createProducer(Collections.singletonList(prefix))) {
+            shardFrontendRegistration = shardedDOMDataTree
+                    .registerDataTreeShard(prefix,
+                            new ShardFrontend(client, prefix),
+                            ((ProxyProducer) producer).delegate());
+        }
+
+        final Future<Object> future = distributedDataStore.getActorContext()
+                .executeOperationAsync(shardedDataTreeActor, new PrefixShardCreated(config), DEFAULT_ASK_TIMEOUT);
+        try {
+            final Object result = Await.result(future, DEFAULT_ASK_TIMEOUT.duration());
+            if (result != null) {
+                throw new DOMDataTreeShardCreationFailedException("Received unexpected response to PrefixShardCreated"
+                        + result);
+            }
+
+            return new DistributedShardRegistrationImpl(shardFrontendRegistration, prefix, shardedDataTreeActor);
+        } catch (final CompletionException e) {
+            shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
+            clientActor.tell(PoisonPill.getInstance(), noSender());
+
+            final Throwable cause = e.getCause();
+            if (cause instanceof DOMDataTreeShardingConflictException) {
+                throw (DOMDataTreeShardingConflictException) cause;
+            }
+
+            throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e.getCause());
+        } catch (final Exception e) {
+            shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
+            clientActor.tell(PoisonPill.getInstance(), noSender());
+
+            throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e);
+        }
+    }
+
+    @Nonnull
+    @Override
+    public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
+            @Nonnull final DOMDataTreeIdentifier prefix,
+            @Nonnull final T shard,
+            @Nonnull final DOMDataTreeProducer producer)
+            throws DOMDataTreeShardingConflictException {
+
+        LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix);
+
+        return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
+    }
+
+    private static void closeProducer(final DOMDataTreeProducer producer) {
+        try {
+            producer.close();
+        } catch (final DOMDataTreeProducerException e) {
+            LOG.error("Unable to close producer", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem,
+                                                       final ShardedDataTreeActorCreator creator,
+                                                       final String shardDataTreeActorId) {
+        Exception lastException = null;
+
+        for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) {
+            try {
+                return actorSystem.actorOf(creator.props(), shardDataTreeActorId);
+            } catch (final Exception e) {
+                lastException = e;
+                Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT);
+                LOG.debug("Could not create actor {} because of {} -"
+                                + " waiting for sometime before retrying (retry count = {})",
+                        shardDataTreeActorId, e.getMessage(), i);
+            }
+        }
+
+        throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
+    }
+
+    private static class DistributedShardRegistrationImpl implements DistributedShardRegistration {
+        private final ListenerRegistration<ShardFrontend> registration;
+        private final DOMDataTreeIdentifier prefix;
+        private final ActorRef shardedDataTreeActor;
+
+        DistributedShardRegistrationImpl(final ListenerRegistration<ShardFrontend> registration,
+                                         final DOMDataTreeIdentifier prefix,
+                                         final ActorRef shardedDataTreeActor) {
+            this.registration = registration;
+            this.prefix = prefix;
+            this.shardedDataTreeActor = shardedDataTreeActor;
+        }
+
+        @Override
+        public void close() {
+            // TODO send the correct messages to ShardManager to destroy the shard
+            // maybe we could provide replica removal mechanisms also?
+            shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
+            registration.close();
+        }
+    }
+
+    private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
+
+        private final DOMDataTreeProducer delegate;
+        private final Collection<DOMDataTreeIdentifier> subtrees;
+        private final ActorRef shardDataTreeActor;
+        private final ActorContext actorContext;
+
+        ProxyProducer(final DOMDataTreeProducer delegate,
+                      final Collection<DOMDataTreeIdentifier> subtrees,
+                      final ActorRef shardDataTreeActor,
+                      final ActorContext actorContext) {
+            this.delegate = Preconditions.checkNotNull(delegate);
+            this.subtrees = Preconditions.checkNotNull(subtrees);
+            this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor);
+            this.actorContext = Preconditions.checkNotNull(actorContext);
+        }
+
+        @Nonnull
+        @Override
+        public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
+            return delegate.createTransaction(isolated);
+        }
+
+        @Nonnull
+        @Override
+        public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
+            // TODO we probably don't need to distribute this on the remote nodes since once we have this producer
+            // open we surely have the rights to all the subtrees.
+            return delegate.createProducer(subtrees);
+        }
+
+        @Override
+        public void close() throws DOMDataTreeProducerException {
+            delegate.close();
+
+            final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
+            if (o instanceof DOMDataTreeProducerException) {
+                throw ((DOMDataTreeProducerException) o);
+            } else if (o instanceof Throwable) {
+                throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
+            }
+        }
+
+        @Override
+        protected DOMDataTreeProducer delegate() {
+            return delegate;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardFrontend.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardFrontend.java
new file mode 100644 (file)
index 0000000..d39ccec
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
+import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Proxy implementation of a shard that creates forwarding producers to the backend shard.
+ */
+class ShardFrontend implements ReadableWriteableDOMDataTreeShard {
+
+    private final DataStoreClient client;
+    private final DOMDataTreeIdentifier shardRoot;
+
+    ShardFrontend(final DataStoreClient client,
+                  final DOMDataTreeIdentifier shardRoot) {
+        this.client = Preconditions.checkNotNull(client);
+        this.shardRoot = Preconditions.checkNotNull(shardRoot);
+    }
+
+    @Override
+    public DOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> paths) {
+        return new ShardProxyProducer(shardRoot, paths, client);
+    }
+
+    @Override
+    public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+        // TODO message directly into the shard
+    }
+
+    @Override
+    public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+        // TODO message directly into the shard
+    }
+
+    @Nonnull
+    @Override
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+            final YangInstanceIdentifier treeId, final L listener) {
+        throw new UnsupportedOperationException("Registering data tree change listener is not supported");
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyProducer.java
new file mode 100644 (file)
index 0000000..f977076
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
+
+/**
+ * Proxy producer implementation that creates transactions that forward all calls to {@link DataStoreClient}.
+ */
+class ShardProxyProducer implements DOMDataTreeShardProducer {
+
+    private final DOMDataTreeIdentifier shardRoot;
+    private final Collection<DOMDataTreeIdentifier> prefixes;
+    private final DataStoreClient client;
+
+    ShardProxyProducer(final DOMDataTreeIdentifier shardRoot, final Collection<DOMDataTreeIdentifier> prefixes,
+                       final DataStoreClient client) {
+        this.shardRoot = Preconditions.checkNotNull(shardRoot);
+        this.prefixes = ImmutableList.copyOf(Preconditions.checkNotNull(prefixes));
+        this.client = Preconditions.checkNotNull(client);
+    }
+
+    @Nonnull
+    @Override
+    public Collection<DOMDataTreeIdentifier> getPrefixes() {
+        return prefixes;
+    }
+
+    @Override
+    public DOMDataTreeShardWriteTransaction createTransaction() {
+        return new ShardProxyTransaction(shardRoot, prefixes, client);
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java
new file mode 100644 (file)
index 0000000..b3c8dfc
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
+ * {@link ClientTransaction} calls.
+ */
+class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
+    private static final ListenableFuture<Void> NULL_FUTURE = Futures.immediateFuture(null);
+    private static final ListenableFuture<Boolean> VALIDATE_FUTURE = Futures.immediateFuture(true);
+
+    private final DOMDataTreeIdentifier shardRoot;
+    private final Collection<DOMDataTreeIdentifier> prefixes;
+    private final DataStoreClient client;
+    private final ClientLocalHistory history;
+    private ClientTransaction currentTx;
+    private DOMStoreThreePhaseCommitCohort cohort;
+
+
+    ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot, final Collection<DOMDataTreeIdentifier> prefixes,
+                          final DataStoreClient client) {
+        this.shardRoot = Preconditions.checkNotNull(shardRoot);
+        this.prefixes = Preconditions.checkNotNull(prefixes);
+        this.client = Preconditions.checkNotNull(client);
+        history = client.createLocalHistory();
+        currentTx = history.createTransaction();
+    }
+
+    @Nonnull
+    @Override
+    public DOMDataTreeWriteCursor createCursor(@Nonnull final DOMDataTreeIdentifier prefix) {
+        checkAvailable(prefix);
+
+        return currentTx.openCursor();
+    }
+
+    private void checkAvailable(final DOMDataTreeIdentifier prefix) {
+        for (final DOMDataTreeIdentifier p : prefixes) {
+            if (p.contains(prefix)) {
+                return;
+            }
+        }
+        throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
+                + "Available prefixes: " + prefixes);
+    }
+
+    @Override
+    public void ready() {
+        LOG.debug("Readying transaction for shard {}", shardRoot);
+
+        Preconditions.checkState(cohort == null, "Transaction was readied already");
+        cohort = currentTx.ready();
+        currentTx = null;
+    }
+
+    @Override
+    public void close() {
+        if (cohort != null) {
+            cohort.abort();
+            cohort = null;
+        }
+        if (currentTx != null) {
+            currentTx.abort();
+            currentTx = null;
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> submit() {
+        LOG.debug("Submitting transaction for shard {}", shardRoot);
+
+        Preconditions.checkNotNull(cohort, "Transaction not readied yet");
+        return NULL_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> validate() {
+        LOG.debug("Validating transaction for shard {}", shardRoot);
+
+        Preconditions.checkNotNull(cohort, "Transaction not readied yet");
+        return VALIDATE_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> prepare() {
+        LOG.debug("Preparing transaction for shard {}", shardRoot);
+
+        Preconditions.checkNotNull(cohort, "Transaction not readied yet");
+        return NULL_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        LOG.debug("Committing transaction for shard {}", shardRoot);
+
+        Preconditions.checkNotNull(cohort, "Transaction not readied yet");
+        return NULL_FUTURE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java
new file mode 100644 (file)
index 0000000..c1a099b
--- /dev/null
@@ -0,0 +1,533 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberExited;
+import akka.cluster.ClusterEvent.MemberRemoved;
+import akka.cluster.ClusterEvent.MemberUp;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
+import akka.cluster.ClusterEvent.ReachableMember;
+import akka.cluster.ClusterEvent.UnreachableMember;
+import akka.cluster.Member;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
+import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
+import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import scala.compat.java8.FutureConverters;
+
+/**
+ * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
+ * nodes of newly open producers/shards on the local node.
+ */
+public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
+
+    private static final String PERSISTENCE_ID = "sharding-service-actor";
+    private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
+
+    private final DOMDataTreeService dataTreeService;
+    private final DOMDataTreeShardingService shardingService;
+    private final ActorSystem actorSystem;
+    private final ClusterWrapper cluster;
+    // helper actorContext used only for static calls to executeAsync etc
+    // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
+    private final ActorContext actorContext;
+    private final ShardingServiceAddressResolver resolver;
+    private final DistributedDataStore distributedConfigDatastore;
+    private final DistributedDataStore distributedOperDatastore;
+
+    private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
+    private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
+
+    ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
+        LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
+
+        dataTreeService = builder.getDataTreeService();
+        shardingService = builder.getShardingService();
+        actorSystem = builder.getActorSystem();
+        cluster = builder.getClusterWrapper();
+        distributedConfigDatastore = builder.getDistributedConfigDatastore();
+        distributedOperDatastore = builder.getDistributedOperDatastore();
+        actorContext = distributedConfigDatastore.getActorContext();
+        resolver = new ShardingServiceAddressResolver(
+                DistributedShardedDOMDataTree.ACTOR_ID, cluster.getCurrentMemberName());
+
+        cluster.subscribeToMemberEvents(self());
+    }
+
+    @Override
+    protected void handleRecover(final Object message) throws Exception {
+        LOG.debug("Received a recover message {}", message);
+    }
+
+    @Override
+    protected void handleCommand(final Object message) throws Exception {
+        if (message instanceof ClusterEvent.MemberUp) {
+            memberUp((ClusterEvent.MemberUp) message);
+        } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
+            memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
+        } else if (message instanceof ClusterEvent.MemberExited) {
+            memberExited((ClusterEvent.MemberExited) message);
+        } else if (message instanceof ClusterEvent.MemberRemoved) {
+            memberRemoved((ClusterEvent.MemberRemoved) message);
+        } else if (message instanceof ClusterEvent.UnreachableMember) {
+            memberUnreachable((ClusterEvent.UnreachableMember) message);
+        } else if (message instanceof ClusterEvent.ReachableMember) {
+            memberReachable((ClusterEvent.ReachableMember) message);
+        } else if (message instanceof ProducerCreated) {
+            onProducerCreated((ProducerCreated) message);
+        } else if (message instanceof NotifyProducerCreated) {
+            onNotifyProducerCreated((NotifyProducerCreated) message);
+        } else if (message instanceof ProducerRemoved) {
+            onProducerRemoved((ProducerRemoved) message);
+        } else if (message instanceof NotifyProducerRemoved) {
+            onNotifyProducerRemoved((NotifyProducerRemoved) message);
+        } else if (message instanceof PrefixShardCreated) {
+            onPrefixShardCreated((PrefixShardCreated) message);
+        } else if (message instanceof CreatePrefixShard) {
+            onCreatePrefixShard((CreatePrefixShard) message);
+        } else if (message instanceof RemovePrefixShard) {
+            onRemovePrefixShard((RemovePrefixShard) message);
+        } else if (message instanceof PrefixShardRemoved) {
+            onPrefixShardRemoved((PrefixShardRemoved) message);
+        }
+    }
+
+    @Override
+    public String persistenceId() {
+        return PERSISTENCE_ID;
+    }
+
+    private void memberUp(final MemberUp message) {
+        final MemberName memberName = memberToName(message.member());
+
+        LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        resolver.addPeerAddress(memberName, message.member().address());
+    }
+
+    private void memberWeaklyUp(final MemberWeaklyUp message) {
+        final MemberName memberName = memberToName(message.member());
+
+        LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        resolver.addPeerAddress(memberName, message.member().address());
+    }
+
+    private void memberExited(final MemberExited message) {
+        final MemberName memberName = memberToName(message.member());
+
+        LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        resolver.removePeerAddress(memberName);
+    }
+
+    private void memberRemoved(final MemberRemoved message) {
+        final MemberName memberName = memberToName(message.member());
+
+        LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        resolver.removePeerAddress(memberName);
+    }
+
+    private void memberUnreachable(final UnreachableMember message) {
+        final MemberName memberName = memberToName(message.member());
+        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        resolver.removePeerAddress(memberName);
+    }
+
+    private void memberReachable(final ReachableMember message) {
+        final MemberName memberName = memberToName(message.member());
+        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        resolver.addPeerAddress(memberName, message.member().address());
+    }
+
+    private void onProducerCreated(final ProducerCreated message) {
+        LOG.debug("Received ProducerCreated: {}", message);
+        final ActorRef sender = getSender();
+        final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
+
+        final List<CompletableFuture<Object>> futures = new ArrayList<>();
+
+        for (final String address : resolver.getShardingServicePeerActorAddresses()) {
+            final ActorSelection actorSelection = actorSystem.actorSelection(address);
+            futures.add(
+                    FutureConverters.toJava(
+                            actorContext.executeOperationAsync(
+                                    actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
+                    .toCompletableFuture());
+        }
+
+        final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+                futures.toArray(new CompletableFuture[futures.size()]));
+
+        combinedFuture.thenRun(() -> {
+            for (final CompletableFuture<Object> future : futures) {
+                try {
+                    final Object result = future.get();
+                    if (result instanceof Status.Failure) {
+                        sender.tell(result, self());
+                        return;
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    sender.tell(new Status.Failure(e), self());
+                    return;
+                }
+            }
+            sender.tell(new Status.Success(null), noSender());
+        }).exceptionally(throwable -> {
+            sender.tell(new Status.Failure(throwable), self());
+            return null;
+        });
+    }
+
+    private void onNotifyProducerCreated(final NotifyProducerCreated message) {
+        LOG.debug("Received NotifyProducerCreated: {}", message);
+
+        final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
+
+        try {
+            final ActorProducerRegistration registration =
+                    new ActorProducerRegistration(dataTreeService.createProducer(subtrees), subtrees);
+            subtrees.forEach(id -> idToProducer.put(id, registration));
+            sender().tell(new Status.Success(null), self());
+        } catch (final IllegalArgumentException e) {
+            sender().tell(new Status.Failure(e), getSelf());
+        }
+    }
+
+    private void onProducerRemoved(final ProducerRemoved message) {
+        LOG.debug("Received ProducerRemoved: {}", message);
+
+        final List<CompletableFuture<Object>> futures = new ArrayList<>();
+
+        for (final String address : resolver.getShardingServicePeerActorAddresses()) {
+            final ActorSelection selection = actorSystem.actorSelection(address);
+
+            futures.add(FutureConverters.toJava(
+                    actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
+                    .toCompletableFuture());
+        }
+
+        final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+                futures.toArray(new CompletableFuture[futures.size()]));
+
+        final ActorRef respondTo = getSender();
+
+        combinedFuture
+                .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
+                .exceptionally(e -> {
+                    respondTo.tell(new Status.Failure(null), self());
+                    return null;
+                });
+
+    }
+
+    private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
+        LOG.debug("Received NotifyProducerRemoved: {}", message);
+
+        final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
+        if (registration == null) {
+            LOG.warn("The notification contained a path on which no producer is registered, throwing away");
+            getSender().tell(new Status.Success(null), noSender());
+            return;
+        }
+
+        try {
+            registration.close();
+            getSender().tell(new Status.Success(null), noSender());
+        } catch (final DOMDataTreeProducerException e) {
+            LOG.error("Unable to close producer", e);
+            getSender().tell(new Status.Failure(e), noSender());
+        }
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void onCreatePrefixShard(final CreatePrefixShard message) {
+        LOG.debug("Received CreatePrefixShard: {}", message);
+
+        final PrefixShardConfiguration configuration = message.getConfiguration();
+
+        final DOMDataTreeProducer producer =
+                dataTreeService.createProducer(Collections.singleton(configuration.getPrefix()));
+
+        final DistributedDataStore distributedDataStore =
+                configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION
+                        ? distributedConfigDatastore : distributedOperDatastore;
+        final String shardName = ClusterUtils.getCleanShardName(configuration.getPrefix().getRootIdentifier());
+        LOG.debug("Creating distributed datastore client for shard {}", shardName);
+        final Props distributedDataStoreClientProps =
+                SimpleDataStoreClientActor.props(cluster.getCurrentMemberName(),
+                        "Shard-" + shardName, distributedDataStore.getActorContext(), shardName);
+
+        final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+        final DataStoreClient client;
+        try {
+            client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (final Exception e) {
+            LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            throw Throwables.propagate(e);
+        }
+
+        try {
+            final ListenerRegistration<ShardFrontend> shardFrontendRegistration =
+                    shardingService.registerDataTreeShard(configuration.getPrefix(),
+                            new ShardFrontend(
+                                    client,
+                                    configuration.getPrefix()
+                            ),
+                            producer);
+            idToShardRegistration.put(configuration.getPrefix(),
+                    new ShardFrontendRegistration(clientActor, shardFrontendRegistration));
+
+            sender().tell(new Status.Success(null), self());
+        } catch (final DOMDataTreeShardingConflictException e) {
+            LOG.error("Unable to create shard", e);
+            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            sender().tell(new Status.Failure(e), self());
+        } finally {
+            try {
+                producer.close();
+            } catch (final DOMDataTreeProducerException e) {
+                LOG.error("Unable to close producer that was used for shard registration {}", producer, e);
+            }
+        }
+    }
+
+    private void onPrefixShardCreated(final PrefixShardCreated message) {
+        LOG.debug("Received PrefixShardCreated: {}", message);
+
+        final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
+        final ActorRef sender = getSender();
+
+        final List<CompletableFuture<Object>> futures = new ArrayList<>();
+
+        for (final String address : addresses) {
+            final ActorSelection actorSelection = actorSystem.actorSelection(address);
+            futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
+                    new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
+        }
+
+        final CompletableFuture<Void> combinedFuture =
+                CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+
+        combinedFuture.thenRun(() -> {
+            for (final CompletableFuture<Object> future : futures) {
+                try {
+                    final Object result = future.get();
+                    if (result instanceof Status.Failure) {
+                        sender.tell(result, self());
+                        return;
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    sender.tell(new Status.Failure(e), self());
+                    return;
+                }
+            }
+            sender.tell(new Status.Success(null), self());
+        }).exceptionally(throwable -> {
+            sender.tell(new Status.Failure(throwable), self());
+            return null;
+        });
+    }
+
+    private void onRemovePrefixShard(final RemovePrefixShard message) {
+        LOG.debug("Received RemovePrefixShard: {}", message);
+
+        for (final String address : resolver.getShardingServicePeerActorAddresses()) {
+            final ActorSelection selection = actorContext.actorSelection(address);
+            selection.tell(new PrefixShardRemoved(message.getPrefix()), getSelf());
+        }
+    }
+
+    private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+        LOG.debug("Received PrefixShardRemoved: {}", message);
+
+        final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
+
+        if (registration == null) {
+            LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
+                    message.getPrefix(), idToShardRegistration);
+            return;
+        }
+
+        registration.close();
+    }
+
+    private static MemberName memberToName(final Member member) {
+        return MemberName.forName(member.roles().iterator().next());
+    }
+
+    private class ActorProducerRegistration {
+
+        private final DOMDataTreeProducer producer;
+        private final Collection<DOMDataTreeIdentifier> subtrees;
+
+        ActorProducerRegistration(final DOMDataTreeProducer producer,
+                                  final Collection<DOMDataTreeIdentifier> subtrees) {
+            this.producer = producer;
+            this.subtrees = subtrees;
+        }
+
+        void close() throws DOMDataTreeProducerException {
+            producer.close();
+            subtrees.forEach(idToProducer::remove);
+        }
+    }
+
+    private static class ShardFrontendRegistration extends
+            AbstractObjectRegistration<ListenerRegistration<ShardFrontend>> {
+
+        private final ActorRef clientActor;
+        private final ListenerRegistration<ShardFrontend> shardRegistration;
+
+        ShardFrontendRegistration(final ActorRef clientActor,
+                                         final ListenerRegistration<ShardFrontend> shardRegistration) {
+            super(shardRegistration);
+            this.clientActor = clientActor;
+            this.shardRegistration = shardRegistration;
+        }
+
+        @Override
+        protected void removeRegistration() {
+            shardRegistration.close();
+            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }
+    }
+
+    public static class ShardedDataTreeActorCreator {
+
+        private DOMDataTreeService dataTreeService;
+        private DOMDataTreeShardingService shardingService;
+        private DistributedDataStore distributedConfigDatastore;
+        private DistributedDataStore distributedOperDatastore;
+        private ActorSystem actorSystem;
+        private ClusterWrapper cluster;
+
+        public DOMDataTreeService getDataTreeService() {
+            return dataTreeService;
+        }
+
+        public ShardedDataTreeActorCreator setDataTreeService(final DOMDataTreeService dataTreeService) {
+            this.dataTreeService = dataTreeService;
+            return this;
+        }
+
+        public DOMDataTreeShardingService getShardingService() {
+            return shardingService;
+        }
+
+        public ShardedDataTreeActorCreator setShardingService(final DOMDataTreeShardingService shardingService) {
+            this.shardingService = shardingService;
+            return this;
+        }
+
+        public ActorSystem getActorSystem() {
+            return actorSystem;
+        }
+
+        public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
+            this.actorSystem = actorSystem;
+            return this;
+        }
+
+        public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
+            this.cluster = cluster;
+            return this;
+        }
+
+        public ClusterWrapper getClusterWrapper() {
+            return cluster;
+        }
+
+        public DistributedDataStore getDistributedConfigDatastore() {
+            return distributedConfigDatastore;
+        }
+
+        public ShardedDataTreeActorCreator setDistributedConfigDatastore(
+                final DistributedDataStore distributedConfigDatastore) {
+            this.distributedConfigDatastore = distributedConfigDatastore;
+            return this;
+        }
+
+        public DistributedDataStore getDistributedOperDatastore() {
+            return distributedOperDatastore;
+        }
+
+        public ShardedDataTreeActorCreator setDistributedOperDatastore(
+                final DistributedDataStore distributedOperDatastore) {
+            this.distributedOperDatastore = distributedOperDatastore;
+            return this;
+        }
+
+        private void verify() {
+            Preconditions.checkNotNull(dataTreeService);
+            Preconditions.checkNotNull(shardingService);
+            Preconditions.checkNotNull(actorSystem);
+            Preconditions.checkNotNull(cluster);
+            Preconditions.checkNotNull(distributedConfigDatastore);
+            Preconditions.checkNotNull(distributedOperDatastore);
+        }
+
+        public Props props() {
+            verify();
+            return Props.create(ShardedDataTreeActor.class, this);
+        }
+
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardingServiceAddressResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardingServiceAddressResolver.java
new file mode 100644 (file)
index 0000000..b443a69
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import akka.actor.Address;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+
+/**
+ * Resolver for remote {@link ShardedDataTreeActor}'s.
+ */
+public class ShardingServiceAddressResolver {
+
+    private final ConcurrentMap<MemberName, Address> memberNameToAddress = new ConcurrentHashMap<>();
+    private final String shardingServiceActorIdentifier;
+    private final MemberName localMemberName;
+
+    public ShardingServiceAddressResolver(final String shardingServiceActorIdentifier,
+                                          final MemberName localMemberName) {
+        this.shardingServiceActorIdentifier = shardingServiceActorIdentifier;
+        this.localMemberName = localMemberName;
+    }
+
+    void addPeerAddress(final MemberName memberName, final Address address) {
+        memberNameToAddress.put(memberName, address);
+    }
+
+    void removePeerAddress(final MemberName memberName) {
+        memberNameToAddress.remove(memberName);
+    }
+
+    Address getPeerAddress(final MemberName memberName) {
+        return memberNameToAddress.get(memberName);
+    }
+
+    StringBuilder getActorPathBuilder(final Address address) {
+        return new StringBuilder().append(address.toString()).append("/user/").append(shardingServiceActorIdentifier);
+    }
+
+    Collection<String> getShardingServicePeerActorAddresses() {
+        final Collection<String> peerAddresses =
+                memberNameToAddress
+                        .entrySet()
+                        .stream()
+                        .filter(entry -> !localMemberName.equals(entry.getKey()))
+                        .map(entry -> getActorPathBuilder(entry.getValue()).toString())
+                        .collect(Collectors.toList());
+
+        return peerAddresses;
+    }
+
+    public String resolve(final MemberName memberName) {
+        Preconditions.checkNotNull(memberName);
+        final Address address = memberNameToAddress.get(memberName);
+        Preconditions.checkNotNull(address, "Requested member[%s] is not present in the resolver ",
+                memberName.toString());
+
+        return getActorPathBuilder(address).toString();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/CreatePrefixShard.java
new file mode 100644 (file)
index 0000000..20c0ea9
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+
+/**
+ * Sent to the local {@link ShardedDataTreeActor} when there was a shard created
+ * on the local node. The local actor should notify the remote actors with {@link PrefixShardCreated} which should
+ * create the required frontend/backend shards.
+ */
+@Beta
+public class CreatePrefixShard implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final PrefixShardConfiguration configuration;
+
+    public CreatePrefixShard(final PrefixShardConfiguration configuration) {
+        this.configuration = Preconditions.checkNotNull(configuration);
+    }
+
+    public PrefixShardConfiguration getConfiguration() {
+        return configuration;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerCreated.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerCreated.java
new file mode 100644 (file)
index 0000000..fb7f017
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Message sent to remote {@link ShardedDataTreeActor}'s when attempting
+ * to create a producer. The remote node should attempt to create a producer in the local sharding service and reply
+ * with success/failure based on the attempt result.
+ */
+@Beta
+public class NotifyProducerCreated implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final Collection<DOMDataTreeIdentifier> subtrees;
+
+    public NotifyProducerCreated(final Collection<DOMDataTreeIdentifier> subtrees) {
+        this.subtrees = ImmutableList.copyOf(Preconditions.checkNotNull(subtrees));
+    }
+
+    public Collection<DOMDataTreeIdentifier> getSubtrees() {
+        return subtrees;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerRemoved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/NotifyProducerRemoved.java
new file mode 100644 (file)
index 0000000..24d871e
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Message sent to remote {@link ShardedDataTreeActor}'s when attempting
+ * to close a producer. The remote node should attempt to close a producer in the local sharding service and reply
+ * with success/failure based on the attempt result. If the producer doesn't exist on this node report Success.
+ */
+@Beta
+public class NotifyProducerRemoved implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final Collection<DOMDataTreeIdentifier> subtrees;
+
+    public NotifyProducerRemoved(final Collection<DOMDataTreeIdentifier> subtrees) {
+        this.subtrees = ImmutableList.copyOf(Preconditions.checkNotNull(subtrees));
+    }
+
+    public Collection<DOMDataTreeIdentifier> getSubtrees() {
+        return subtrees;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardCreated.java
new file mode 100644 (file)
index 0000000..f7113ab
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+
+/**
+ * Message sent to the local {@link ShardedDataTreeActor} when a clustered
+ * shard was created locally. The backend shards/replicas will be handled by the ShardManager but the
+ * {@link ShardedDataTreeActor} needs to handle the registration of the
+ * frontends into the {@link org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService}. The configuration only contains
+ * the Member nodes that this is still yet to be distributed to. The last node will receive PrefixShardConfiguration
+ * with only it's member present.
+ */
+@Beta
+public class PrefixShardCreated {
+    private final PrefixShardConfiguration configuration;
+
+    public PrefixShardCreated(final PrefixShardConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public PrefixShardConfiguration getConfiguration() {
+        return configuration;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/PrefixShardRemoved.java
new file mode 100644 (file)
index 0000000..5f84071
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import java.io.Serializable;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Message sent to remote {@link ShardedDataTreeActor}'s when there is an
+ * attempt to remove a shard from the sharding service.
+ */
+@Beta
+public class PrefixShardRemoved implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final DOMDataTreeIdentifier prefix;
+
+    public PrefixShardRemoved(final DOMDataTreeIdentifier prefix) {
+        this.prefix = prefix;
+    }
+
+    public DOMDataTreeIdentifier getPrefix() {
+        return prefix;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerCreated.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerCreated.java
new file mode 100644 (file)
index 0000000..efdf557
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Message sent to local {@link ShardedDataTreeActor}'s when there was an
+ * attempt to create a producer on the local node.
+ */
+@Beta
+public class ProducerCreated {
+    private final Collection<DOMDataTreeIdentifier> subtrees;
+
+    public ProducerCreated(final Collection<DOMDataTreeIdentifier> subtrees) {
+        this.subtrees = subtrees;
+    }
+
+    public Collection<DOMDataTreeIdentifier> getSubtrees() {
+        return subtrees;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerRemoved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/ProducerRemoved.java
new file mode 100644 (file)
index 0000000..1cc33a0
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Message sent to local {@link ShardedDataTreeActor}'s when there was an
+ * attempt to close a producer on the local node.
+ */
+@Beta
+public class ProducerRemoved {
+
+    private final Collection<DOMDataTreeIdentifier> subtrees;
+
+    public ProducerRemoved(final Collection<DOMDataTreeIdentifier> subtrees) {
+        this.subtrees = subtrees;
+    }
+
+    public Collection<DOMDataTreeIdentifier> getSubtrees() {
+        return subtrees;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/messages/RemovePrefixShard.java
new file mode 100644 (file)
index 0000000..fa74af6
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node. The local actor
+ * should then notify the remote nodes of the Removal with {@link PrefixShardRemoved} message.
+ */
+public class RemovePrefixShard {
+
+    private final DOMDataTreeIdentifier prefix;
+
+    public RemovePrefixShard(final DOMDataTreeIdentifier prefix) {
+
+        this.prefix = Preconditions.checkNotNull(prefix);
+    }
+
+    public DOMDataTreeIdentifier getPrefix() {
+        return prefix;
+    }
+}
index 1d501e1..f24ac0e 100644 (file)
@@ -29,6 +29,7 @@ import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -99,6 +100,28 @@ public class IntegrationTestKit extends ShardTestKit {
         return dataStore;
     }
 
+    public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
+                                                                       final SchemaContext schemaContext) {
+        final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
+        final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
+
+        getDatastoreContextBuilder().dataStoreName(typeName);
+
+        final DatastoreContext datastoreContext = getDatastoreContextBuilder().build();
+
+        final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+
+        final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
+                configuration, mockContextFactory, restoreFromSnapshot);
+
+        dataStore.onGlobalContextUpdated(schemaContext);
+
+        datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
+        return dataStore;
+    }
+
     public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) {
         for (String shardName: shardNames) {
             ActorRef shard = findLocalShard(actorContext, shardName);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java
new file mode 100644 (file)
index 0000000..deb6aa5
--- /dev/null
@@ -0,0 +1,234 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.cluster.Cluster;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+
+public class DistributedShardedDOMDataTreeTest extends AbstractTest {
+
+    private static final Address MEMBER_1_ADDRESS =
+            AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+
+    private static final DOMDataTreeIdentifier TEST_ID =
+            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+    private ShardedDOMDataTree shardedDOMDataTree = new ShardedDOMDataTree();
+
+    private ActorSystem leaderSystem;
+    private ActorSystem followerSystem;
+
+
+    private final Builder leaderDatastoreContextBuilder =
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+
+    private final DatastoreContext.Builder followerDatastoreContextBuilder =
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
+                    .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+    private DistributedDataStore followerDistributedDataStore;
+    private DistributedDataStore leaderDistributedDataStore;
+    private IntegrationTestKit followerTestKit;
+    private IntegrationTestKit leaderTestKit;
+
+    private DistributedShardedDOMDataTree leaderShardFactory;
+    private DistributedShardedDOMDataTree followerShardFactory;
+
+    @Before
+    public void setUp() {
+        shardedDOMDataTree = new ShardedDOMDataTree();
+
+        leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
+
+        followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+        Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
+    }
+
+    @After
+    public void tearDown() {
+        if (followerDistributedDataStore != null) {
+            leaderDistributedDataStore.close();
+        }
+        if (leaderDistributedDataStore != null) {
+            leaderDistributedDataStore.close();
+        }
+
+        JavaTestKit.shutdownActorSystem(leaderSystem);
+        JavaTestKit.shutdownActorSystem(followerSystem);
+    }
+
+    private void initEmptyDatastore(final String type) {
+        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+
+        leaderDistributedDataStore =
+                leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+
+        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+        followerDistributedDataStore =
+                followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+
+        leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
+                Mockito.mock(DistributedDataStore.class),
+                leaderDistributedDataStore);
+
+        followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
+                Mockito.mock(DistributedDataStore.class),
+                followerDistributedDataStore);
+    }
+
+    @Test
+    public void testProducerRegistrations() throws Exception {
+        initEmptyDatastore("config");
+
+        final DistributedShardRegistration shardRegistration =
+                leaderShardFactory.createDistributedShard(TEST_ID,
+                        Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+        final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+
+        leaderShardManager.tell(
+                new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef());
+        leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalShardFound.class);
+
+        IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+        leaderShardManager.tell(
+                new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef());
+        leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalPrimaryShardFound.class);
+
+        final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
+        followerShardManager.tell(
+                new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
+        followerTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), RemotePrimaryShardFound.class);
+
+        final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+        try {
+            followerShardFactory.createProducer(Collections.singleton(TEST_ID));
+            fail("Producer should be already registered on the other node");
+        } catch (final IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("is attached to producer"));
+        }
+
+        producer.close();
+
+        final DOMDataTreeProducer followerProducer =
+                followerShardFactory.createProducer(Collections.singleton(TEST_ID));
+        try {
+            leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+            fail("Producer should be already registered on the other node");
+        } catch (final IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("is attached to producer"));
+        }
+
+        followerProducer.close();
+        // try to create a shard on an already registered prefix on follower
+        try {
+            followerShardFactory.createDistributedShard(TEST_ID,
+                    Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+            fail("This prefix already should have a shard registration that was forwarded from the other node");
+        } catch (final DOMDataTreeShardingConflictException e) {
+            assertTrue(e.getMessage().contains("is already occupied by shard"));
+        }
+    }
+
+    @Test
+    @Ignore("Needs some other stuff related to 5280")
+    public void testWriteIntoMultipleShards() throws Exception {
+        initEmptyDatastore("config");
+
+        final DistributedShardRegistration shardRegistration =
+                leaderShardFactory.createDistributedShard(
+                        TEST_ID,Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+                ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+        final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+
+        new JavaTestKit(leaderSystem) {
+            {
+                leaderShardManager.tell(
+                        new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+                final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
+
+                followerShardManager.tell(
+                        new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+                leaderDistributedDataStore.getActorContext().getShardManager().tell(
+                        new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
+            }
+        };
+
+        final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+
+        final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
+        final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
+        Assert.assertNotNull(cursor);
+        final YangInstanceIdentifier nameId =
+                YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
+        cursor.write(nameId.getLastPathArgument(),
+                ImmutableLeafNodeBuilder.<String>create()
+                        .withNodeIdentifier(new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
+
+        cursor.close();
+        tx.submit();
+
+
+    }
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.