X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FDistributedShardedDOMDataTree.java;h=cdb249f3cda01dcc45d50828fb565da445586bf6;hp=9726a986c15074f9838cbb8f38fa84f8860dcbe4;hb=b5cb353e3553a39f576c284119af75ffa5ea66a9;hpb=78fb550781389291d0d5142a098385c30dfd291d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java index 9726a986c1..cdb249f3cd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -25,6 +25,7 @@ import com.google.common.collect.ForwardingObject; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.AbstractMap.SimpleEntry; @@ -32,7 +33,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumMap; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletionStage; @@ -46,7 +49,7 @@ import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; @@ -54,6 +57,8 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer; +import org.opendaylight.controller.cluster.dom.api.CDSShardAccess; import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator; import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard; @@ -108,8 +113,8 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat private final ShardedDOMDataTree shardedDOMDataTree; private final ActorSystem actorSystem; - private final DistributedDataStore distributedOperDatastore; - private final DistributedDataStore distributedConfigDatastore; + private final AbstractDataStore distributedOperDatastore; + private final AbstractDataStore distributedConfigDatastore; private final ActorRef shardedDataTreeActor; private final MemberName memberName; @@ -130,8 +135,8 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat private final PrefixedShardConfigUpdateHandler updateHandler; public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider, - final DistributedDataStore distributedOperDatastore, - final DistributedDataStore distributedConfigDatastore) { + final AbstractDataStore distributedOperDatastore, + final AbstractDataStore distributedConfigDatastore) { this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem(); this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore); this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore); @@ -157,7 +162,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat createPrefixConfigShard(distributedOperDatastore); } - private void createPrefixConfigShard(final DistributedDataStore dataStore) { + private static void createPrefixConfigShard(final AbstractDataStore dataStore) { Configuration configuration = dataStore.getActorContext().getConfiguration(); Collection memberNames = configuration.getUniqueMemberNamesForAllShards(); CreateShard createShardMessage = @@ -303,14 +308,16 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat LOG.debug("{} - Received success from remote nodes, creating producer:{}", distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees); return new ProxyProducer(producer, subtrees, shardedDataTreeActor, - distributedConfigDatastore.getActorContext()); - } else if (response instanceof Exception) { - closeProducer(producer); - throw Throwables.propagate((Exception) response); - } else { - closeProducer(producer); - throw new RuntimeException("Unexpected response to create producer received." + response); + distributedConfigDatastore.getActorContext(), shards); + } + + closeProducer(producer); + + if (response instanceof Throwable) { + Throwables.throwIfUnchecked((Throwable) response); + throw new RuntimeException((Throwable) response); } + throw new RuntimeException("Unexpected response to create producer received." + response); } @Override @@ -362,7 +369,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat shardRegistrationPromise.failure( new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable)); } - }); + }, MoreExecutors.directExecutor()); return FutureConverters.toJava(shardRegistrationPromise.future()); } @@ -394,20 +401,19 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat private void createShardFrontend(final DOMDataTreeIdentifier prefix) { LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix); final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier()); - final DistributedDataStore distributedDataStore = + final AbstractDataStore distributedDataStore = prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION) ? distributedConfigDatastore : distributedOperDatastore; - try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) { + try (DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) { final Entry entry = createDatastoreClient(shardName, distributedDataStore.getActorContext()); final DistributedShardFrontend shard = new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix); - @SuppressWarnings("unchecked") final DOMDataTreeShardRegistration reg = - (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer); + shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer); synchronized (shards) { shards.store(prefix, reg); @@ -447,21 +453,22 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(@Nullable Void result) { + public void onSuccess(@Nullable final Void result) { LOG.debug("{} - Succesfuly removed shard for {}", memberName, prefix); } @Override - public void onFailure(Throwable throwable) { + public void onFailure(final Throwable throwable) { LOG.error("Removal of shard {} from configuration failed.", prefix, throwable); } - }); + }, MoreExecutors.directExecutor()); } DOMDataTreePrefixTableEntry> lookupShardFrontend( final DOMDataTreeIdentifier prefix) { - return shards.lookup(prefix); - + synchronized (shards) { + return shards.lookup(prefix); + } } DOMDataTreeProducer localCreateProducer(final Collection prefix) { @@ -611,13 +618,13 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat final Future closeFuture = ask.transform( new Mapper() { @Override - public Void apply(Object parameter) { + public Void apply(final Object parameter) { return null; } }, new Mapper() { @Override - public Throwable apply(Throwable throwable) { + public Throwable apply(final Throwable throwable) { return throwable; } }, actorSystem.dispatcher()); @@ -626,21 +633,32 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat } } - private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer { + // TODO what about producers created by this producer? + // They should also be CDSProducers + private static final class ProxyProducer extends ForwardingObject implements CDSDataTreeProducer { private final DOMDataTreeProducer delegate; private final Collection subtrees; private final ActorRef shardDataTreeActor; private final ActorContext actorContext; + @GuardedBy("shardAccessMap") + private final Map shardAccessMap = new HashMap<>(); + + // We don't have to guard access to shardTable in ProxyProducer. + // ShardTable's entries relevant to this ProxyProducer shouldn't + // change during producer's lifetime. + private final DOMDataTreePrefixTable> shardTable; ProxyProducer(final DOMDataTreeProducer delegate, final Collection subtrees, final ActorRef shardDataTreeActor, - final ActorContext actorContext) { + final ActorContext actorContext, + final DOMDataTreePrefixTable> shardLayout) { this.delegate = Preconditions.checkNotNull(delegate); this.subtrees = Preconditions.checkNotNull(subtrees); this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor); this.actorContext = Preconditions.checkNotNull(actorContext); + this.shardTable = Preconditions.checkNotNull(shardLayout); } @Nonnull @@ -658,12 +676,17 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat } @Override + @SuppressWarnings("checkstyle:IllegalCatch") public void close() throws DOMDataTreeProducerException { delegate.close(); + synchronized (shardAccessMap) { + shardAccessMap.values().forEach(CDSShardAccessImpl::close); + } + final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees)); if (o instanceof DOMDataTreeProducerException) { - throw ((DOMDataTreeProducerException) o); + throw (DOMDataTreeProducerException) o; } else if (o instanceof Throwable) { throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o); } @@ -673,5 +696,33 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat protected DOMDataTreeProducer delegate() { return delegate; } + + @Nonnull + @Override + public CDSShardAccess getShardAccess(@Nonnull final DOMDataTreeIdentifier subtree) { + Preconditions.checkArgument( + subtrees.stream().anyMatch(dataTreeIdentifier -> dataTreeIdentifier.contains(subtree)), + "Subtree %s is not controlled by this producer %s", subtree, this); + + final DOMDataTreePrefixTableEntry> lookup = + shardTable.lookup(subtree); + Preconditions.checkState(lookup != null, "Subtree %s is not contained in any registered shard.", subtree); + + final DOMDataTreeIdentifier lookupId = lookup.getValue().getPrefix(); + + synchronized (shardAccessMap) { + if (shardAccessMap.get(lookupId) != null) { + return shardAccessMap.get(lookupId); + } + + // TODO Maybe we can have static factory method and return the same instance + // for same subtrees. But maybe it is not needed since there can be only one + // producer attached to some subtree at a time. And also how we can close ShardAccess + // then + final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(lookupId, actorContext); + shardAccessMap.put(lookupId, shardAccess); + return shardAccess; + } + } } }