--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.annotations.Beta;
+import javax.annotation.Nonnull;
+
+/**
+ * Exception thrown when there was a at any point during the creation of a shard via {@link DistributedShardFactory}.
+ */
+@Beta
+public class DOMDataTreeShardCreationFailedException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public DOMDataTreeShardCreationFailedException(final @Nonnull String message) {
+ super(message);
+ }
+
+ public DOMDataTreeShardCreationFailedException(final @Nonnull String message, final @Nonnull Throwable cause) {
+ super(message, cause);
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.cluster.datastore;
+package org.opendaylight.controller.cluster.sharding;
+import com.google.common.annotations.Beta;
import java.util.Collection;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
* all the boilerplate that comes with registration of a new clustered shard into the system and creating the backend
* shard/replicas that come along with it.
*/
+@Beta
public interface DistributedShardFactory {
/**
*/
DistributedShardRegistration createDistributedShard(DOMDataTreeIdentifier prefix,
Collection<MemberName> replicaMembers)
- throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException;
+ throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException,
+ DOMDataTreeShardCreationFailedException;
interface DistributedShardRegistration extends Registration {
@Override
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
+import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+/**
+ * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
+ * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system.
+ */
+public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService,
+ DistributedShardFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
+
+ private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
+ private static final int MAX_ACTOR_CREATION_RETRIES = 100;
+ private static final int ACTOR_RETRY_DELAY = 100;
+ private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
+
+ static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
+
+ private final ShardedDOMDataTree shardedDOMDataTree;
+ private final ActorSystem actorSystem;
+ private final DistributedDataStore distributedOperDatastore;
+ private final DistributedDataStore distributedConfigDatastore;
+
+ private final ActorRef shardedDataTreeActor;
+ private final MemberName memberName;
+
+ public DistributedShardedDOMDataTree(final ActorSystem actorSystem,
+ final DistributedDataStore distributedOperDatastore,
+ final DistributedDataStore distributedConfigDatastore) {
+ this.actorSystem = Preconditions.checkNotNull(actorSystem);
+ this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore);
+ this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore);
+ shardedDOMDataTree = new ShardedDOMDataTree();
+
+ shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
+ new ShardedDataTreeActorCreator()
+ .setDataTreeService(shardedDOMDataTree)
+ .setShardingService(shardedDOMDataTree)
+ .setActorSystem(actorSystem)
+ .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
+ .setDistributedConfigDatastore(distributedConfigDatastore)
+ .setDistributedOperDatastore(distributedOperDatastore),
+ ACTOR_ID);
+
+ this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
+ }
+
+ @Nonnull
+ @Override
+ public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
+ final T listener, final Collection<DOMDataTreeIdentifier> subtrees,
+ final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers)
+ throws DOMDataTreeLoopException {
+
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Nonnull
+ @Override
+ public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
+ LOG.debug("Creating producer for {}", subtrees);
+ final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
+
+ final Object response = distributedConfigDatastore.getActorContext()
+ .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
+ if (response == null) {
+ LOG.debug("Received success from remote nodes, creating producer:{}", subtrees);
+ return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
+ distributedConfigDatastore.getActorContext());
+ } else if (response instanceof Exception) {
+ closeProducer(producer);
+ throw Throwables.propagate((Exception) response);
+ } else {
+ closeProducer(producer);
+ throw new RuntimeException("Unexpected response to create producer received." + response);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public DistributedShardRegistration createDistributedShard(
+ final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
+ throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException,
+ DOMDataTreeShardCreationFailedException {
+
+ final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
+ final DistributedDataStore distributedDataStore =
+ prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
+ ? distributedConfigDatastore : distributedOperDatastore;
+
+ final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
+ if (replicaMembers.contains(memberName)) {
+ // spawn the backend shard and have the shard Manager create all replicas
+ final ActorRef shardManager = distributedDataStore.getActorContext().getShardManager();
+
+ shardManager.tell(new CreatePrefixedShard(config, null, Shard.builder()), noSender());
+ }
+
+ LOG.debug("Creating distributed datastore client for shard {}", shardName);
+ final Props distributedDataStoreClientProps =
+ SimpleDataStoreClientActor
+ .props(memberName, "Shard-" + shardName, distributedDataStore.getActorContext(), shardName);
+
+ final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+ final DataStoreClient client;
+ try {
+ client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (final Exception e) {
+ LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), noSender());
+ throw new DOMDataTreeProducerException("Unable to create producer", e);
+ }
+
+ // register the frontend into the sharding service and let the actor distribute this onto the other nodes
+ final ListenerRegistration<ShardFrontend> shardFrontendRegistration;
+ try (DOMDataTreeProducer producer = createProducer(Collections.singletonList(prefix))) {
+ shardFrontendRegistration = shardedDOMDataTree
+ .registerDataTreeShard(prefix,
+ new ShardFrontend(client, prefix),
+ ((ProxyProducer) producer).delegate());
+ }
+
+ final Future<Object> future = distributedDataStore.getActorContext()
+ .executeOperationAsync(shardedDataTreeActor, new PrefixShardCreated(config), DEFAULT_ASK_TIMEOUT);
+ try {
+ final Object result = Await.result(future, DEFAULT_ASK_TIMEOUT.duration());
+ if (result != null) {
+ throw new DOMDataTreeShardCreationFailedException("Received unexpected response to PrefixShardCreated"
+ + result);
+ }
+
+ return new DistributedShardRegistrationImpl(shardFrontendRegistration, prefix, shardedDataTreeActor);
+ } catch (final CompletionException e) {
+ shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
+ clientActor.tell(PoisonPill.getInstance(), noSender());
+
+ final Throwable cause = e.getCause();
+ if (cause instanceof DOMDataTreeShardingConflictException) {
+ throw (DOMDataTreeShardingConflictException) cause;
+ }
+
+ throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e.getCause());
+ } catch (final Exception e) {
+ shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
+ clientActor.tell(PoisonPill.getInstance(), noSender());
+
+ throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e);
+ }
+ }
+
+ @Nonnull
+ @Override
+ public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
+ @Nonnull final DOMDataTreeIdentifier prefix,
+ @Nonnull final T shard,
+ @Nonnull final DOMDataTreeProducer producer)
+ throws DOMDataTreeShardingConflictException {
+
+ LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix);
+
+ return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
+ }
+
+ private static void closeProducer(final DOMDataTreeProducer producer) {
+ try {
+ producer.close();
+ } catch (final DOMDataTreeProducerException e) {
+ LOG.error("Unable to close producer", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem,
+ final ShardedDataTreeActorCreator creator,
+ final String shardDataTreeActorId) {
+ Exception lastException = null;
+
+ for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) {
+ try {
+ return actorSystem.actorOf(creator.props(), shardDataTreeActorId);
+ } catch (final Exception e) {
+ lastException = e;
+ Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT);
+ LOG.debug("Could not create actor {} because of {} -"
+ + " waiting for sometime before retrying (retry count = {})",
+ shardDataTreeActorId, e.getMessage(), i);
+ }
+ }
+
+ throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
+ }
+
+ private static class DistributedShardRegistrationImpl implements DistributedShardRegistration {
+ private final ListenerRegistration<ShardFrontend> registration;
+ private final DOMDataTreeIdentifier prefix;
+ private final ActorRef shardedDataTreeActor;
+
+ DistributedShardRegistrationImpl(final ListenerRegistration<ShardFrontend> registration,
+ final DOMDataTreeIdentifier prefix,
+ final ActorRef shardedDataTreeActor) {
+ this.registration = registration;
+ this.prefix = prefix;
+ this.shardedDataTreeActor = shardedDataTreeActor;
+ }
+
+ @Override
+ public void close() {
+ // TODO send the correct messages to ShardManager to destroy the shard
+ // maybe we could provide replica removal mechanisms also?
+ shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender());
+ registration.close();
+ }
+ }
+
+ private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
+
+ private final DOMDataTreeProducer delegate;
+ private final Collection<DOMDataTreeIdentifier> subtrees;
+ private final ActorRef shardDataTreeActor;
+ private final ActorContext actorContext;
+
+ ProxyProducer(final DOMDataTreeProducer delegate,
+ final Collection<DOMDataTreeIdentifier> subtrees,
+ final ActorRef shardDataTreeActor,
+ final ActorContext actorContext) {
+ this.delegate = Preconditions.checkNotNull(delegate);
+ this.subtrees = Preconditions.checkNotNull(subtrees);
+ this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ }
+
+ @Nonnull
+ @Override
+ public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
+ return delegate.createTransaction(isolated);
+ }
+
+ @Nonnull
+ @Override
+ public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
+ // TODO we probably don't need to distribute this on the remote nodes since once we have this producer
+ // open we surely have the rights to all the subtrees.
+ return delegate.createProducer(subtrees);
+ }
+
+ @Override
+ public void close() throws DOMDataTreeProducerException {
+ delegate.close();
+
+ final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
+ if (o instanceof DOMDataTreeProducerException) {
+ throw ((DOMDataTreeProducerException) o);
+ } else if (o instanceof Throwable) {
+ throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
+ }
+ }
+
+ @Override
+ protected DOMDataTreeProducer delegate() {
+ return delegate;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
+import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Proxy implementation of a shard that creates forwarding producers to the backend shard.
+ */
+class ShardFrontend implements ReadableWriteableDOMDataTreeShard {
+
+ private final DataStoreClient client;
+ private final DOMDataTreeIdentifier shardRoot;
+
+ ShardFrontend(final DataStoreClient client,
+ final DOMDataTreeIdentifier shardRoot) {
+ this.client = Preconditions.checkNotNull(client);
+ this.shardRoot = Preconditions.checkNotNull(shardRoot);
+ }
+
+ @Override
+ public DOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> paths) {
+ return new ShardProxyProducer(shardRoot, paths, client);
+ }
+
+ @Override
+ public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+ // TODO message directly into the shard
+ }
+
+ @Override
+ public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+ // TODO message directly into the shard
+ }
+
+ @Nonnull
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+ final YangInstanceIdentifier treeId, final L listener) {
+ throw new UnsupportedOperationException("Registering data tree change listener is not supported");
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
+
+/**
+ * Proxy producer implementation that creates transactions that forward all calls to {@link DataStoreClient}.
+ */
+class ShardProxyProducer implements DOMDataTreeShardProducer {
+
+ private final DOMDataTreeIdentifier shardRoot;
+ private final Collection<DOMDataTreeIdentifier> prefixes;
+ private final DataStoreClient client;
+
+ ShardProxyProducer(final DOMDataTreeIdentifier shardRoot, final Collection<DOMDataTreeIdentifier> prefixes,
+ final DataStoreClient client) {
+ this.shardRoot = Preconditions.checkNotNull(shardRoot);
+ this.prefixes = ImmutableList.copyOf(Preconditions.checkNotNull(prefixes));
+ this.client = Preconditions.checkNotNull(client);
+ }
+
+ @Nonnull
+ @Override
+ public Collection<DOMDataTreeIdentifier> getPrefixes() {
+ return prefixes;
+ }
+
+ @Override
+ public DOMDataTreeShardWriteTransaction createTransaction() {
+ return new ShardProxyTransaction(shardRoot, prefixes, client);
+ }
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
+ * {@link ClientTransaction} calls.
+ */
+class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
+ private static final ListenableFuture<Void> NULL_FUTURE = Futures.immediateFuture(null);
+ private static final ListenableFuture<Boolean> VALIDATE_FUTURE = Futures.immediateFuture(true);
+
+ private final DOMDataTreeIdentifier shardRoot;
+ private final Collection<DOMDataTreeIdentifier> prefixes;
+ private final DataStoreClient client;
+ private final ClientLocalHistory history;
+ private ClientTransaction currentTx;
+ private DOMStoreThreePhaseCommitCohort cohort;
+
+
+ ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot, final Collection<DOMDataTreeIdentifier> prefixes,
+ final DataStoreClient client) {
+ this.shardRoot = Preconditions.checkNotNull(shardRoot);
+ this.prefixes = Preconditions.checkNotNull(prefixes);
+ this.client = Preconditions.checkNotNull(client);
+ history = client.createLocalHistory();
+ currentTx = history.createTransaction();
+ }
+
+ @Nonnull
+ @Override
+ public DOMDataTreeWriteCursor createCursor(@Nonnull final DOMDataTreeIdentifier prefix) {
+ checkAvailable(prefix);
+
+ return currentTx.openCursor();
+ }
+
+ private void checkAvailable(final DOMDataTreeIdentifier prefix) {
+ for (final DOMDataTreeIdentifier p : prefixes) {
+ if (p.contains(prefix)) {
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
+ + "Available prefixes: " + prefixes);
+ }
+
+ @Override
+ public void ready() {
+ LOG.debug("Readying transaction for shard {}", shardRoot);
+
+ Preconditions.checkState(cohort == null, "Transaction was readied already");
+ cohort = currentTx.ready();
+ currentTx = null;
+ }
+
+ @Override
+ public void close() {
+ if (cohort != null) {
+ cohort.abort();
+ cohort = null;
+ }
+ if (currentTx != null) {
+ currentTx.abort();
+ currentTx = null;
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> submit() {
+ LOG.debug("Submitting transaction for shard {}", shardRoot);
+
+ Preconditions.checkNotNull(cohort, "Transaction not readied yet");
+ return NULL_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> validate() {
+ LOG.debug("Validating transaction for shard {}", shardRoot);
+
+ Preconditions.checkNotNull(cohort, "Transaction not readied yet");
+ return VALIDATE_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Void> prepare() {
+ LOG.debug("Preparing transaction for shard {}", shardRoot);
+
+ Preconditions.checkNotNull(cohort, "Transaction not readied yet");
+ return NULL_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ LOG.debug("Committing transaction for shard {}", shardRoot);
+
+ Preconditions.checkNotNull(cohort, "Transaction not readied yet");
+ return NULL_FUTURE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberExited;
+import akka.cluster.ClusterEvent.MemberRemoved;
+import akka.cluster.ClusterEvent.MemberUp;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
+import akka.cluster.ClusterEvent.ReachableMember;
+import akka.cluster.ClusterEvent.UnreachableMember;
+import akka.cluster.Member;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
+import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
+import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
+import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
+import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import scala.compat.java8.FutureConverters;
+
+/**
+ * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
+ * nodes of newly open producers/shards on the local node.
+ */
+public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
+
+ private static final String PERSISTENCE_ID = "sharding-service-actor";
+ private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
+
+ private final DOMDataTreeService dataTreeService;
+ private final DOMDataTreeShardingService shardingService;
+ private final ActorSystem actorSystem;
+ private final ClusterWrapper cluster;
+ // helper actorContext used only for static calls to executeAsync etc
+ // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
+ private final ActorContext actorContext;
+ private final ShardingServiceAddressResolver resolver;
+ private final DistributedDataStore distributedConfigDatastore;
+ private final DistributedDataStore distributedOperDatastore;
+
+ private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
+ private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
+
+ ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
+ LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
+
+ dataTreeService = builder.getDataTreeService();
+ shardingService = builder.getShardingService();
+ actorSystem = builder.getActorSystem();
+ cluster = builder.getClusterWrapper();
+ distributedConfigDatastore = builder.getDistributedConfigDatastore();
+ distributedOperDatastore = builder.getDistributedOperDatastore();
+ actorContext = distributedConfigDatastore.getActorContext();
+ resolver = new ShardingServiceAddressResolver(
+ DistributedShardedDOMDataTree.ACTOR_ID, cluster.getCurrentMemberName());
+
+ cluster.subscribeToMemberEvents(self());
+ }
+
+ @Override
+ protected void handleRecover(final Object message) throws Exception {
+ LOG.debug("Received a recover message {}", message);
+ }
+
+ @Override
+ protected void handleCommand(final Object message) throws Exception {
+ if (message instanceof ClusterEvent.MemberUp) {
+ memberUp((ClusterEvent.MemberUp) message);
+ } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
+ memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
+ } else if (message instanceof ClusterEvent.MemberExited) {
+ memberExited((ClusterEvent.MemberExited) message);
+ } else if (message instanceof ClusterEvent.MemberRemoved) {
+ memberRemoved((ClusterEvent.MemberRemoved) message);
+ } else if (message instanceof ClusterEvent.UnreachableMember) {
+ memberUnreachable((ClusterEvent.UnreachableMember) message);
+ } else if (message instanceof ClusterEvent.ReachableMember) {
+ memberReachable((ClusterEvent.ReachableMember) message);
+ } else if (message instanceof ProducerCreated) {
+ onProducerCreated((ProducerCreated) message);
+ } else if (message instanceof NotifyProducerCreated) {
+ onNotifyProducerCreated((NotifyProducerCreated) message);
+ } else if (message instanceof ProducerRemoved) {
+ onProducerRemoved((ProducerRemoved) message);
+ } else if (message instanceof NotifyProducerRemoved) {
+ onNotifyProducerRemoved((NotifyProducerRemoved) message);
+ } else if (message instanceof PrefixShardCreated) {
+ onPrefixShardCreated((PrefixShardCreated) message);
+ } else if (message instanceof CreatePrefixShard) {
+ onCreatePrefixShard((CreatePrefixShard) message);
+ } else if (message instanceof RemovePrefixShard) {
+ onRemovePrefixShard((RemovePrefixShard) message);
+ } else if (message instanceof PrefixShardRemoved) {
+ onPrefixShardRemoved((PrefixShardRemoved) message);
+ }
+ }
+
+ @Override
+ public String persistenceId() {
+ return PERSISTENCE_ID;
+ }
+
+ private void memberUp(final MemberUp message) {
+ final MemberName memberName = memberToName(message.member());
+
+ LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
+ resolver.addPeerAddress(memberName, message.member().address());
+ }
+
+ private void memberWeaklyUp(final MemberWeaklyUp message) {
+ final MemberName memberName = memberToName(message.member());
+
+ LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
+ resolver.addPeerAddress(memberName, message.member().address());
+ }
+
+ private void memberExited(final MemberExited message) {
+ final MemberName memberName = memberToName(message.member());
+
+ LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
+ resolver.removePeerAddress(memberName);
+ }
+
+ private void memberRemoved(final MemberRemoved message) {
+ final MemberName memberName = memberToName(message.member());
+
+ LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
+ resolver.removePeerAddress(memberName);
+ }
+
+ private void memberUnreachable(final UnreachableMember message) {
+ final MemberName memberName = memberToName(message.member());
+ LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+ resolver.removePeerAddress(memberName);
+ }
+
+ private void memberReachable(final ReachableMember message) {
+ final MemberName memberName = memberToName(message.member());
+ LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+ resolver.addPeerAddress(memberName, message.member().address());
+ }
+
+ private void onProducerCreated(final ProducerCreated message) {
+ LOG.debug("Received ProducerCreated: {}", message);
+ final ActorRef sender = getSender();
+ final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
+
+ final List<CompletableFuture<Object>> futures = new ArrayList<>();
+
+ for (final String address : resolver.getShardingServicePeerActorAddresses()) {
+ final ActorSelection actorSelection = actorSystem.actorSelection(address);
+ futures.add(
+ FutureConverters.toJava(
+ actorContext.executeOperationAsync(
+ actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
+ .toCompletableFuture());
+ }
+
+ final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+ futures.toArray(new CompletableFuture[futures.size()]));
+
+ combinedFuture.thenRun(() -> {
+ for (final CompletableFuture<Object> future : futures) {
+ try {
+ final Object result = future.get();
+ if (result instanceof Status.Failure) {
+ sender.tell(result, self());
+ return;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ sender.tell(new Status.Failure(e), self());
+ return;
+ }
+ }
+ sender.tell(new Status.Success(null), noSender());
+ }).exceptionally(throwable -> {
+ sender.tell(new Status.Failure(throwable), self());
+ return null;
+ });
+ }
+
+ private void onNotifyProducerCreated(final NotifyProducerCreated message) {
+ LOG.debug("Received NotifyProducerCreated: {}", message);
+
+ final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
+
+ try {
+ final ActorProducerRegistration registration =
+ new ActorProducerRegistration(dataTreeService.createProducer(subtrees), subtrees);
+ subtrees.forEach(id -> idToProducer.put(id, registration));
+ sender().tell(new Status.Success(null), self());
+ } catch (final IllegalArgumentException e) {
+ sender().tell(new Status.Failure(e), getSelf());
+ }
+ }
+
+ private void onProducerRemoved(final ProducerRemoved message) {
+ LOG.debug("Received ProducerRemoved: {}", message);
+
+ final List<CompletableFuture<Object>> futures = new ArrayList<>();
+
+ for (final String address : resolver.getShardingServicePeerActorAddresses()) {
+ final ActorSelection selection = actorSystem.actorSelection(address);
+
+ futures.add(FutureConverters.toJava(
+ actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
+ .toCompletableFuture());
+ }
+
+ final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+ futures.toArray(new CompletableFuture[futures.size()]));
+
+ final ActorRef respondTo = getSender();
+
+ combinedFuture
+ .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
+ .exceptionally(e -> {
+ respondTo.tell(new Status.Failure(null), self());
+ return null;
+ });
+
+ }
+
+ private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
+ LOG.debug("Received NotifyProducerRemoved: {}", message);
+
+ final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
+ if (registration == null) {
+ LOG.warn("The notification contained a path on which no producer is registered, throwing away");
+ getSender().tell(new Status.Success(null), noSender());
+ return;
+ }
+
+ try {
+ registration.close();
+ getSender().tell(new Status.Success(null), noSender());
+ } catch (final DOMDataTreeProducerException e) {
+ LOG.error("Unable to close producer", e);
+ getSender().tell(new Status.Failure(e), noSender());
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void onCreatePrefixShard(final CreatePrefixShard message) {
+ LOG.debug("Received CreatePrefixShard: {}", message);
+
+ final PrefixShardConfiguration configuration = message.getConfiguration();
+
+ final DOMDataTreeProducer producer =
+ dataTreeService.createProducer(Collections.singleton(configuration.getPrefix()));
+
+ final DistributedDataStore distributedDataStore =
+ configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION
+ ? distributedConfigDatastore : distributedOperDatastore;
+ final String shardName = ClusterUtils.getCleanShardName(configuration.getPrefix().getRootIdentifier());
+ LOG.debug("Creating distributed datastore client for shard {}", shardName);
+ final Props distributedDataStoreClientProps =
+ SimpleDataStoreClientActor.props(cluster.getCurrentMemberName(),
+ "Shard-" + shardName, distributedDataStore.getActorContext(), shardName);
+
+ final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+ final DataStoreClient client;
+ try {
+ client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (final Exception e) {
+ LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ throw Throwables.propagate(e);
+ }
+
+ try {
+ final ListenerRegistration<ShardFrontend> shardFrontendRegistration =
+ shardingService.registerDataTreeShard(configuration.getPrefix(),
+ new ShardFrontend(
+ client,
+ configuration.getPrefix()
+ ),
+ producer);
+ idToShardRegistration.put(configuration.getPrefix(),
+ new ShardFrontendRegistration(clientActor, shardFrontendRegistration));
+
+ sender().tell(new Status.Success(null), self());
+ } catch (final DOMDataTreeShardingConflictException e) {
+ LOG.error("Unable to create shard", e);
+ clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ sender().tell(new Status.Failure(e), self());
+ } finally {
+ try {
+ producer.close();
+ } catch (final DOMDataTreeProducerException e) {
+ LOG.error("Unable to close producer that was used for shard registration {}", producer, e);
+ }
+ }
+ }
+
+ private void onPrefixShardCreated(final PrefixShardCreated message) {
+ LOG.debug("Received PrefixShardCreated: {}", message);
+
+ final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
+ final ActorRef sender = getSender();
+
+ final List<CompletableFuture<Object>> futures = new ArrayList<>();
+
+ for (final String address : addresses) {
+ final ActorSelection actorSelection = actorSystem.actorSelection(address);
+ futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
+ new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
+ }
+
+ final CompletableFuture<Void> combinedFuture =
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+
+ combinedFuture.thenRun(() -> {
+ for (final CompletableFuture<Object> future : futures) {
+ try {
+ final Object result = future.get();
+ if (result instanceof Status.Failure) {
+ sender.tell(result, self());
+ return;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ sender.tell(new Status.Failure(e), self());
+ return;
+ }
+ }
+ sender.tell(new Status.Success(null), self());
+ }).exceptionally(throwable -> {
+ sender.tell(new Status.Failure(throwable), self());
+ return null;
+ });
+ }
+
+ private void onRemovePrefixShard(final RemovePrefixShard message) {
+ LOG.debug("Received RemovePrefixShard: {}", message);
+
+ for (final String address : resolver.getShardingServicePeerActorAddresses()) {
+ final ActorSelection selection = actorContext.actorSelection(address);
+ selection.tell(new PrefixShardRemoved(message.getPrefix()), getSelf());
+ }
+ }
+
+ private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+ LOG.debug("Received PrefixShardRemoved: {}", message);
+
+ final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
+
+ if (registration == null) {
+ LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
+ message.getPrefix(), idToShardRegistration);
+ return;
+ }
+
+ registration.close();
+ }
+
+ private static MemberName memberToName(final Member member) {
+ return MemberName.forName(member.roles().iterator().next());
+ }
+
+ private class ActorProducerRegistration {
+
+ private final DOMDataTreeProducer producer;
+ private final Collection<DOMDataTreeIdentifier> subtrees;
+
+ ActorProducerRegistration(final DOMDataTreeProducer producer,
+ final Collection<DOMDataTreeIdentifier> subtrees) {
+ this.producer = producer;
+ this.subtrees = subtrees;
+ }
+
+ void close() throws DOMDataTreeProducerException {
+ producer.close();
+ subtrees.forEach(idToProducer::remove);
+ }
+ }
+
+ private static class ShardFrontendRegistration extends
+ AbstractObjectRegistration<ListenerRegistration<ShardFrontend>> {
+
+ private final ActorRef clientActor;
+ private final ListenerRegistration<ShardFrontend> shardRegistration;
+
+ ShardFrontendRegistration(final ActorRef clientActor,
+ final ListenerRegistration<ShardFrontend> shardRegistration) {
+ super(shardRegistration);
+ this.clientActor = clientActor;
+ this.shardRegistration = shardRegistration;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ shardRegistration.close();
+ clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ }
+
+ public static class ShardedDataTreeActorCreator {
+
+ private DOMDataTreeService dataTreeService;
+ private DOMDataTreeShardingService shardingService;
+ private DistributedDataStore distributedConfigDatastore;
+ private DistributedDataStore distributedOperDatastore;
+ private ActorSystem actorSystem;
+ private ClusterWrapper cluster;
+
+ public DOMDataTreeService getDataTreeService() {
+ return dataTreeService;
+ }
+
+ public ShardedDataTreeActorCreator setDataTreeService(final DOMDataTreeService dataTreeService) {
+ this.dataTreeService = dataTreeService;
+ return this;
+ }
+
+ public DOMDataTreeShardingService getShardingService() {
+ return shardingService;
+ }
+
+ public ShardedDataTreeActorCreator setShardingService(final DOMDataTreeShardingService shardingService) {
+ this.shardingService = shardingService;
+ return this;
+ }
+
+ public ActorSystem getActorSystem() {
+ return actorSystem;
+ }
+
+ public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
+ this.actorSystem = actorSystem;
+ return this;
+ }
+
+ public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
+ this.cluster = cluster;
+ return this;
+ }
+
+ public ClusterWrapper getClusterWrapper() {
+ return cluster;
+ }
+
+ public DistributedDataStore getDistributedConfigDatastore() {
+ return distributedConfigDatastore;
+ }
+
+ public ShardedDataTreeActorCreator setDistributedConfigDatastore(
+ final DistributedDataStore distributedConfigDatastore) {
+ this.distributedConfigDatastore = distributedConfigDatastore;
+ return this;
+ }
+
+ public DistributedDataStore getDistributedOperDatastore() {
+ return distributedOperDatastore;
+ }
+
+ public ShardedDataTreeActorCreator setDistributedOperDatastore(
+ final DistributedDataStore distributedOperDatastore) {
+ this.distributedOperDatastore = distributedOperDatastore;
+ return this;
+ }
+
+ private void verify() {
+ Preconditions.checkNotNull(dataTreeService);
+ Preconditions.checkNotNull(shardingService);
+ Preconditions.checkNotNull(actorSystem);
+ Preconditions.checkNotNull(cluster);
+ Preconditions.checkNotNull(distributedConfigDatastore);
+ Preconditions.checkNotNull(distributedOperDatastore);
+ }
+
+ public Props props() {
+ verify();
+ return Props.create(ShardedDataTreeActor.class, this);
+ }
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import akka.actor.Address;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+
+/**
+ * Resolver for remote {@link ShardedDataTreeActor}'s.
+ */
+public class ShardingServiceAddressResolver {
+
+ private final ConcurrentMap<MemberName, Address> memberNameToAddress = new ConcurrentHashMap<>();
+ private final String shardingServiceActorIdentifier;
+ private final MemberName localMemberName;
+
+ public ShardingServiceAddressResolver(final String shardingServiceActorIdentifier,
+ final MemberName localMemberName) {
+ this.shardingServiceActorIdentifier = shardingServiceActorIdentifier;
+ this.localMemberName = localMemberName;
+ }
+
+ void addPeerAddress(final MemberName memberName, final Address address) {
+ memberNameToAddress.put(memberName, address);
+ }
+
+ void removePeerAddress(final MemberName memberName) {
+ memberNameToAddress.remove(memberName);
+ }
+
+ Address getPeerAddress(final MemberName memberName) {
+ return memberNameToAddress.get(memberName);
+ }
+
+ StringBuilder getActorPathBuilder(final Address address) {
+ return new StringBuilder().append(address.toString()).append("/user/").append(shardingServiceActorIdentifier);
+ }
+
+ Collection<String> getShardingServicePeerActorAddresses() {
+ final Collection<String> peerAddresses =
+ memberNameToAddress
+ .entrySet()
+ .stream()
+ .filter(entry -> !localMemberName.equals(entry.getKey()))
+ .map(entry -> getActorPathBuilder(entry.getValue()).toString())
+ .collect(Collectors.toList());
+
+ return peerAddresses;
+ }
+
+ public String resolve(final MemberName memberName) {
+ Preconditions.checkNotNull(memberName);
+ final Address address = memberNameToAddress.get(memberName);
+ Preconditions.checkNotNull(address, "Requested member[%s] is not present in the resolver ",
+ memberName.toString());
+
+ return getActorPathBuilder(address).toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+
+/**
+ * Sent to the local {@link ShardedDataTreeActor} when there was a shard created
+ * on the local node. The local actor should notify the remote actors with {@link PrefixShardCreated} which should
+ * create the required frontend/backend shards.
+ */
+@Beta
+public class CreatePrefixShard implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final PrefixShardConfiguration configuration;
+
+ public CreatePrefixShard(final PrefixShardConfiguration configuration) {
+ this.configuration = Preconditions.checkNotNull(configuration);
+ }
+
+ public PrefixShardConfiguration getConfiguration() {
+ return configuration;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Message sent to remote {@link ShardedDataTreeActor}'s when attempting
+ * to create a producer. The remote node should attempt to create a producer in the local sharding service and reply
+ * with success/failure based on the attempt result.
+ */
+@Beta
+public class NotifyProducerCreated implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final Collection<DOMDataTreeIdentifier> subtrees;
+
+ public NotifyProducerCreated(final Collection<DOMDataTreeIdentifier> subtrees) {
+ this.subtrees = ImmutableList.copyOf(Preconditions.checkNotNull(subtrees));
+ }
+
+ public Collection<DOMDataTreeIdentifier> getSubtrees() {
+ return subtrees;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Message sent to remote {@link ShardedDataTreeActor}'s when attempting
+ * to close a producer. The remote node should attempt to close a producer in the local sharding service and reply
+ * with success/failure based on the attempt result. If the producer doesn't exist on this node report Success.
+ */
+@Beta
+public class NotifyProducerRemoved implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final Collection<DOMDataTreeIdentifier> subtrees;
+
+ public NotifyProducerRemoved(final Collection<DOMDataTreeIdentifier> subtrees) {
+ this.subtrees = ImmutableList.copyOf(Preconditions.checkNotNull(subtrees));
+ }
+
+ public Collection<DOMDataTreeIdentifier> getSubtrees() {
+ return subtrees;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+
+/**
+ * Message sent to the local {@link ShardedDataTreeActor} when a clustered
+ * shard was created locally. The backend shards/replicas will be handled by the ShardManager but the
+ * {@link ShardedDataTreeActor} needs to handle the registration of the
+ * frontends into the {@link org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService}. The configuration only contains
+ * the Member nodes that this is still yet to be distributed to. The last node will receive PrefixShardConfiguration
+ * with only it's member present.
+ */
+@Beta
+public class PrefixShardCreated {
+ private final PrefixShardConfiguration configuration;
+
+ public PrefixShardCreated(final PrefixShardConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public PrefixShardConfiguration getConfiguration() {
+ return configuration;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import java.io.Serializable;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Message sent to remote {@link ShardedDataTreeActor}'s when there is an
+ * attempt to remove a shard from the sharding service.
+ */
+@Beta
+public class PrefixShardRemoved implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final DOMDataTreeIdentifier prefix;
+
+ public PrefixShardRemoved(final DOMDataTreeIdentifier prefix) {
+ this.prefix = prefix;
+ }
+
+ public DOMDataTreeIdentifier getPrefix() {
+ return prefix;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Message sent to local {@link ShardedDataTreeActor}'s when there was an
+ * attempt to create a producer on the local node.
+ */
+@Beta
+public class ProducerCreated {
+ private final Collection<DOMDataTreeIdentifier> subtrees;
+
+ public ProducerCreated(final Collection<DOMDataTreeIdentifier> subtrees) {
+ this.subtrees = subtrees;
+ }
+
+ public Collection<DOMDataTreeIdentifier> getSubtrees() {
+ return subtrees;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.annotations.Beta;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Message sent to local {@link ShardedDataTreeActor}'s when there was an
+ * attempt to close a producer on the local node.
+ */
+@Beta
+public class ProducerRemoved {
+
+ private final Collection<DOMDataTreeIdentifier> subtrees;
+
+ public ProducerRemoved(final Collection<DOMDataTreeIdentifier> subtrees) {
+ this.subtrees = subtrees;
+ }
+
+ public Collection<DOMDataTreeIdentifier> getSubtrees() {
+ return subtrees;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Sent to the local {@link ShardedDataTreeActor} to notify of a shard removal on the local node. The local actor
+ * should then notify the remote nodes of the Removal with {@link PrefixShardRemoved} message.
+ */
+public class RemovePrefixShard {
+
+ private final DOMDataTreeIdentifier prefix;
+
+ public RemovePrefixShard(final DOMDataTreeIdentifier prefix) {
+
+ this.prefix = Preconditions.checkNotNull(prefix);
+ }
+
+ public DOMDataTreeIdentifier getPrefix() {
+ return prefix;
+ }
+}
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
return dataStore;
}
+ public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
+ final SchemaContext schemaContext) {
+ final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
+ final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
+
+ getDatastoreContextBuilder().dataStoreName(typeName);
+
+ final DatastoreContext datastoreContext = getDatastoreContextBuilder().build();
+
+ final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+ Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
+ Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+
+ final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
+ configuration, mockContextFactory, restoreFromSnapshot);
+
+ dataStore.onGlobalContextUpdated(schemaContext);
+
+ datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
+ return dataStore;
+ }
+
public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) {
for (String shardName: shardNames) {
ActorRef shard = findLocalShard(actorContext, shardName);
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.cluster.Cluster;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+
+public class DistributedShardedDOMDataTreeTest extends AbstractTest {
+
+ private static final Address MEMBER_1_ADDRESS =
+ AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+
+ private static final DOMDataTreeIdentifier TEST_ID =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+ private ShardedDOMDataTree shardedDOMDataTree = new ShardedDOMDataTree();
+
+ private ActorSystem leaderSystem;
+ private ActorSystem followerSystem;
+
+
+ private final Builder leaderDatastoreContextBuilder =
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+
+ private final DatastoreContext.Builder followerDatastoreContextBuilder =
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ private DistributedDataStore followerDistributedDataStore;
+ private DistributedDataStore leaderDistributedDataStore;
+ private IntegrationTestKit followerTestKit;
+ private IntegrationTestKit leaderTestKit;
+
+ private DistributedShardedDOMDataTree leaderShardFactory;
+ private DistributedShardedDOMDataTree followerShardFactory;
+
+ @Before
+ public void setUp() {
+ shardedDOMDataTree = new ShardedDOMDataTree();
+
+ leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
+
+ followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+ Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
+ }
+
+ @After
+ public void tearDown() {
+ if (followerDistributedDataStore != null) {
+ leaderDistributedDataStore.close();
+ }
+ if (leaderDistributedDataStore != null) {
+ leaderDistributedDataStore.close();
+ }
+
+ JavaTestKit.shutdownActorSystem(leaderSystem);
+ JavaTestKit.shutdownActorSystem(followerSystem);
+ }
+
+ private void initEmptyDatastore(final String type) {
+ leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+
+ leaderDistributedDataStore =
+ leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+
+ followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+ followerDistributedDataStore =
+ followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+
+ leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
+ Mockito.mock(DistributedDataStore.class),
+ leaderDistributedDataStore);
+
+ followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
+ Mockito.mock(DistributedDataStore.class),
+ followerDistributedDataStore);
+ }
+
+ @Test
+ public void testProducerRegistrations() throws Exception {
+ initEmptyDatastore("config");
+
+ final DistributedShardRegistration shardRegistration =
+ leaderShardFactory.createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+ final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+
+ leaderShardManager.tell(
+ new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef());
+ leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalShardFound.class);
+
+ IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+ leaderShardManager.tell(
+ new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), leaderTestKit.getRef());
+ leaderTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), LocalPrimaryShardFound.class);
+
+ final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
+ followerShardManager.tell(
+ new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
+ followerTestKit.expectMsgClass(JavaTestKit.duration("10 seconds"), RemotePrimaryShardFound.class);
+
+ final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+ try {
+ followerShardFactory.createProducer(Collections.singleton(TEST_ID));
+ fail("Producer should be already registered on the other node");
+ } catch (final IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("is attached to producer"));
+ }
+
+ producer.close();
+
+ final DOMDataTreeProducer followerProducer =
+ followerShardFactory.createProducer(Collections.singleton(TEST_ID));
+ try {
+ leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+ fail("Producer should be already registered on the other node");
+ } catch (final IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("is attached to producer"));
+ }
+
+ followerProducer.close();
+ // try to create a shard on an already registered prefix on follower
+ try {
+ followerShardFactory.createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ fail("This prefix already should have a shard registration that was forwarded from the other node");
+ } catch (final DOMDataTreeShardingConflictException e) {
+ assertTrue(e.getMessage().contains("is already occupied by shard"));
+ }
+ }
+
+ @Test
+ @Ignore("Needs some other stuff related to 5280")
+ public void testWriteIntoMultipleShards() throws Exception {
+ initEmptyDatastore("config");
+
+ final DistributedShardRegistration shardRegistration =
+ leaderShardFactory.createDistributedShard(
+ TEST_ID,Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+ final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+
+ new JavaTestKit(leaderSystem) {
+ {
+ leaderShardManager.tell(
+ new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
+
+ followerShardManager.tell(
+ new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ leaderDistributedDataStore.getActorContext().getShardManager().tell(
+ new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
+ }
+ };
+
+ final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+
+ final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
+ final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
+ Assert.assertNotNull(cursor);
+ final YangInstanceIdentifier nameId =
+ YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
+ cursor.write(nameId.getLastPathArgument(),
+ ImmutableLeafNodeBuilder.<String>create()
+ .withNodeIdentifier(new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
+
+ cursor.close();
+ tx.submit();
+
+
+ }
+}