import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
+import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
+import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard;
+import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification;
+import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
+import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Beta
public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener {
- private static final class SubshardProducerSpecification {
- private final Collection<DOMDataTreeIdentifier> prefixes = new ArrayList<>(1);
- private final ChildShardContext shard;
-
- SubshardProducerSpecification(final ChildShardContext subshard) {
- this.shard = Preconditions.checkNotNull(subshard);
- }
-
- void addPrefix(final DOMDataTreeIdentifier prefix) {
- prefixes.add(prefix);
- }
-
- DOMDataTreeShardProducer createProducer() {
- return shard.getShard().createProducer(prefixes);
- }
-
- public DOMDataTreeIdentifier getPrefix() {
- return shard.getPrefix();
- }
- }
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class);
+ private static final int DEFAULT_SUBMIT_QUEUE_SIZE = 1000;
private final DOMDataTreePrefixTable<ChildShardContext> childShardsTable = DOMDataTreePrefixTable.create();
-
private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
+ private final Collection<InMemoryDOMDataTreeShardProducer> producers = new HashSet<>();
+ private final InMemoryDOMDataTreeShardChangePublisher shardChangePublisher;
+ private final ListeningExecutorService executor;
private final DOMDataTreeIdentifier prefix;
private final DataTree dataTree;
- private InMemoryDOMDataTreeShardChangePublisher shardChangePublisher;
-
- private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor,
- final int maxDataChangeListenerQueueSize) {
+ private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final Executor dataTreeChangeExecutor,
+ final int maxDataChangeListenerQueueSize, final int submitQueueSize) {
this.prefix = Preconditions.checkNotNull(prefix);
final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier());
- this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor, maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
+ this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor,
+ maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
+
+ final FastThreadPoolExecutor fte = new FastThreadPoolExecutor(1, submitQueueSize, "Shard[" + prefix + "]");
+ fte.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerWaitsPolicy());
+ this.executor = MoreExecutors.listeningDecorator(fte);
}
- public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id, final ExecutorService dataTreeChangeExecutor,
+ public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
+ final Executor dataTreeChangeExecutor,
final int maxDataChangeListenerQueueSize) {
- return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize);
+ return new InMemoryDOMDataTreeShard(id.toOptimized(), dataTreeChangeExecutor,
+ maxDataChangeListenerQueueSize, DEFAULT_SUBMIT_QUEUE_SIZE);
+ }
+
+ public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
+ final Executor dataTreeChangeExecutor,
+ final int maxDataChangeListenerQueueSize,
+ final int submitQueueSize) {
+ return new InMemoryDOMDataTreeShard(id.toOptimized(), dataTreeChangeExecutor,
+ maxDataChangeListenerQueueSize, submitQueueSize);
}
@Override
public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
Preconditions.checkArgument(child != this, "Attempted to attach child %s onto self", this);
reparentChildShards(prefix, child);
- addChildShard(prefix, child);
+
+ final ChildShardContext context = createContextFor(prefix, child);
+ childShards.put(prefix, context);
+ childShardsTable.store(prefix, context);
+ updateProducers();
}
@Override
public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
childShards.remove(prefix);
+ childShardsTable.remove(prefix);
+ updateProducers();
+ }
+
+ private void updateProducers() {
+ for (InMemoryDOMDataTreeShardProducer p : producers) {
+ p.setModificationFactory(createModificationFactory(p.getPrefixes()));
+ }
+ }
+
+ @VisibleForTesting
+ InMemoryShardDataModificationFactory createModificationFactory(final Collection<DOMDataTreeIdentifier> prefixes) {
+ final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affected = new HashMap<>();
+ for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
+ for (final ChildShardContext child : childShards.values()) {
+ final DOMDataTreeIdentifier bindPath;
+ if (producerPrefix.contains(child.getPrefix())) {
+ bindPath = child.getPrefix();
+ } else if (child.getPrefix().contains(producerPrefix)) {
+ // Bound path is inside subshard
+ bindPath = producerPrefix;
+ } else {
+ continue;
+ }
+
+ SubshardProducerSpecification spec = affected.get(child.getPrefix());
+ if (spec == null) {
+ spec = new SubshardProducerSpecification(child);
+ affected.put(child.getPrefix(), spec);
+ }
+ spec.addPrefix(bindPath);
+ }
+ }
+
+ final InmemoryShardDataModificationFactoryBuilder builder =
+ new InmemoryShardDataModificationFactoryBuilder(prefix);
+ for (final SubshardProducerSpecification spec : affected.values()) {
+ final ForeignShardModificationContext foreignContext =
+ new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
+ builder.addSubshard(foreignContext);
+ builder.addSubshard(spec.getPrefix(), foreignContext);
+ }
+
+ return builder.build();
}
@Override
public InMemoryDOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> prefixes) {
- for (DOMDataTreeIdentifier prodPrefix : prefixes) {
+ for (final DOMDataTreeIdentifier prodPrefix : prefixes) {
Preconditions.checkArgument(prefix.contains(prodPrefix), "Prefix %s is not contained under shart root",
- prodPrefix, prefix);
+ prodPrefix, prefix);
+ }
+
+ final InMemoryDOMDataTreeShardProducer ret = new InMemoryDOMDataTreeShardProducer(this, prefixes,
+ createModificationFactory(prefixes));
+ producers.add(ret);
+ return ret;
+ }
+
+ void closeProducer(final InMemoryDOMDataTreeShardProducer producer) {
+ if (!producers.remove(producer)) {
+ LOG.warn("Producer {} not found in shard {}", producer, this);
}
- return new InMemoryDOMDataTreeShardProducer(this, prefixes);
}
@Nonnull
@Override
- public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(@Nonnull YangInstanceIdentifier treeId, @Nonnull L listener) {
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+ @Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
return shardChangePublisher.registerTreeChangeListener(treeId, listener);
}
- private void addChildShard(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
- ChildShardContext context = createContextFor(prefix, child);
- childShards.put(prefix, context);
- childShardsTable.store(prefix, context);
- }
-
private void reparentChildShards(final DOMDataTreeIdentifier newChildPrefix, final DOMDataTreeShard newChild) {
- Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren = childShards.entrySet().iterator();
- Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
+ final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren =
+ childShards.entrySet().iterator();
+ final Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
while (actualChildren.hasNext()) {
final Entry<DOMDataTreeIdentifier, ChildShardContext> actualChild = actualChildren.next();
final DOMDataTreeIdentifier actualPrefix = actualChild.getKey();
Preconditions.checkArgument(!newChildPrefix.equals(actualPrefix),
"Child shard with prefix %s already attached", newChildPrefix);
if (newChildPrefix.contains(actualPrefix)) {
- ChildShardContext actualContext = actualChild.getValue();
+ final ChildShardContext actualContext = actualChild.getValue();
actualChildren.remove();
newChild.onChildAttached(actualPrefix, actualContext.getShard());
reparented.put(actualChild.getKey(), actualContext);
throw new UnsupportedOperationException();
}
- private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+ private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix,
+ final DOMDataTreeShard child) {
Preconditions.checkArgument(child instanceof WriteableDOMDataTreeShard,
- "Child %s is not a writable shared", child);
+ "Child %s is not a writable shared", child);
return new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child);
}
@VisibleForTesting
Map<DOMDataTreeIdentifier, DOMDataTreeShard> getChildShards() {
- Map<DOMDataTreeIdentifier, DOMDataTreeShard> ret = new HashMap<>();
- for (Entry<DOMDataTreeIdentifier, ChildShardContext> entry : childShards.entrySet()) {
- ret.put(entry.getKey(), entry.getValue().getShard());
- }
- return ret;
+ return ImmutableMap.copyOf(Maps.transformValues(childShards, ChildShardContext::getShard));
}
- InmemoryDOMDataTreeShardWriteTransaction createTransaction(final InmemoryDOMDataTreeShardWriteTransaction previousTx) {
- // FIXME: implement this
- throw new UnsupportedOperationException();
+ DataTreeSnapshot takeSnapshot() {
+ return dataTree.takeSnapshot();
}
- InmemoryDOMDataTreeShardWriteTransaction createTransaction(final Collection<DOMDataTreeIdentifier> prefixes) {
-
- Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
- for (DOMDataTreeIdentifier producerPrefix : prefixes) {
- for (ChildShardContext maybeAffected : childShards.values()) {
- final DOMDataTreeIdentifier bindPath;
- if (producerPrefix.contains(maybeAffected.getPrefix())) {
- bindPath = maybeAffected.getPrefix();
- } else if (maybeAffected.getPrefix().contains(producerPrefix)) {
- // Bound path is inside subshard
- bindPath = producerPrefix;
- } else {
- continue;
- }
-
- SubshardProducerSpecification spec = affectedSubshards.get(maybeAffected.getPrefix());
- if (spec == null) {
- spec = new SubshardProducerSpecification(maybeAffected);
- affectedSubshards.put(maybeAffected.getPrefix(), spec);
- }
- spec.addPrefix(bindPath);
- }
- }
-
- ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix,
- (CursorAwareDataTreeSnapshot) dataTree.takeSnapshot());
- ShardDataModificationBuilder builder = new ShardDataModificationBuilder(rootContext);
- for (SubshardProducerSpecification spec : affectedSubshards.values()) {
- ForeignShardModificationContext foreignContext =
- new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
- builder.addSubshard(foreignContext);
- builder.addSubshard(spec.getPrefix(), foreignContext);
- }
+ InmemoryDOMDataTreeShardWriteTransaction createTransaction(final String transactionId,
+ final InMemoryDOMDataTreeShardProducer producer, final DataTreeSnapshot snapshot) {
+ Preconditions.checkArgument(snapshot instanceof CursorAwareDataTreeSnapshot);
- return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree, shardChangePublisher);
+ return new InmemoryDOMDataTreeShardWriteTransaction(producer,
+ producer.getModificationFactory().createModification((CursorAwareDataTreeSnapshot) snapshot), dataTree,
+ shardChangePublisher, executor);
}
}