import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nonnull;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
@Beta
-public class InMemoryDOMDataTreeShard implements WriteableDOMDataTreeShard, SchemaContextListener {
+public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener {
private static final class SubshardProducerSpecification {
private final Collection<DOMDataTreeIdentifier> prefixes = new ArrayList<>(1);
private final ChildShardContext shard;
-
SubshardProducerSpecification(final ChildShardContext subshard) {
this.shard = Preconditions.checkNotNull(subshard);
}
}
}
- private static final class ChildShardContext {
- private final WriteableDOMDataTreeShard shard;
- private final DOMDataTreeIdentifier prefix;
-
- public ChildShardContext(final DOMDataTreeIdentifier prefix, final WriteableDOMDataTreeShard shard) {
- this.prefix = Preconditions.checkNotNull(prefix);
- this.shard = Preconditions.checkNotNull(shard);
- }
-
- public WriteableDOMDataTreeShard getShard() {
- return shard;
- }
-
- public DOMDataTreeIdentifier getPrefix() {
- return prefix;
- }
- }
-
private final DOMDataTreePrefixTable<ChildShardContext> childShardsTable = DOMDataTreePrefixTable.create();
private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
private final DOMDataTreeIdentifier prefix;
private final DataTree dataTree;
+ private final InMemoryDOMDataTreeShardChangePublisher shardChangePublisher;
+ private final ListeningExecutorService executor;
- private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix) {
+ private SchemaContext schemaContext;
+
+ private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor,
+ final int maxDataChangeListenerQueueSize, final int maxCommitQueueSize) {
this.prefix = Preconditions.checkNotNull(prefix);
final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
- this.dataTree = prefix.getRootIdentifier().isEmpty() ? InMemoryDataTreeFactory.getInstance().create(treeType)
- : InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier());
+ this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier());
+
+ this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor, maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
+ this.executor = MoreExecutors.listeningDecorator(SpecialExecutors.newBoundedSingleThreadExecutor(maxCommitQueueSize, "Shard-executor[" + prefix + "]"));
}
- public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id) {
- return new InMemoryDOMDataTreeShard(id);
+ public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id, final ExecutorService dataTreeChangeExecutor,
+ final int maxDataChangeListenerQueueSize, final int maxCommitQueueSize) {
+ return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize, maxCommitQueueSize);
}
@Override
public void onGlobalContextUpdated(final SchemaContext context) {
dataTree.setSchemaContext(context);
+ schemaContext = context;
}
@Override
@Override
public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
childShards.remove(prefix);
+ childShardsTable.remove(prefix);
}
@Override
public InMemoryDOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> prefixes) {
- for (DOMDataTreeIdentifier prodPrefix : prefixes) {
+ for (final DOMDataTreeIdentifier prodPrefix : prefixes) {
Preconditions.checkArgument(prefix.contains(prodPrefix), "Prefix %s is not contained under shart root",
prodPrefix, prefix);
}
return new InMemoryDOMDataTreeShardProducer(this, prefixes);
}
+ @Nonnull
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(@Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
+ return shardChangePublisher.registerTreeChangeListener(treeId, listener);
+ }
+
private void addChildShard(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
- ChildShardContext context = createContextFor(prefix, child);
+ final ChildShardContext context = createContextFor(prefix, child);
childShards.put(prefix, context);
childShardsTable.store(prefix, context);
}
private void reparentChildShards(final DOMDataTreeIdentifier newChildPrefix, final DOMDataTreeShard newChild) {
- Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren = childShards.entrySet().iterator();
- Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
+ final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren = childShards.entrySet().iterator();
+ final Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
while (actualChildren.hasNext()) {
final Entry<DOMDataTreeIdentifier, ChildShardContext> actualChild = actualChildren.next();
final DOMDataTreeIdentifier actualPrefix = actualChild.getKey();
Preconditions.checkArgument(!newChildPrefix.equals(actualPrefix),
"Child shard with prefix %s already attached", newChildPrefix);
if (newChildPrefix.contains(actualPrefix)) {
- ChildShardContext actualContext = actualChild.getValue();
+ final ChildShardContext actualContext = actualChild.getValue();
actualChildren.remove();
newChild.onChildAttached(actualPrefix, actualContext.getShard());
reparented.put(actualChild.getKey(), actualContext);
@VisibleForTesting
Map<DOMDataTreeIdentifier, DOMDataTreeShard> getChildShards() {
- Map<DOMDataTreeIdentifier, DOMDataTreeShard> ret = new HashMap<>();
- for (Entry<DOMDataTreeIdentifier, ChildShardContext> entry : childShards.entrySet()) {
+ final Map<DOMDataTreeIdentifier, DOMDataTreeShard> ret = new HashMap<>();
+ for (final Entry<DOMDataTreeIdentifier, ChildShardContext> entry : childShards.entrySet()) {
ret.put(entry.getKey(), entry.getValue().getShard());
}
return ret;
InmemoryDOMDataTreeShardWriteTransaction createTransaction(final Collection<DOMDataTreeIdentifier> prefixes) {
- Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
- for (DOMDataTreeIdentifier producerPrefix : prefixes) {
- for (ChildShardContext maybeAffected : childShards.values()) {
+ final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
+ for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
+ for (final ChildShardContext maybeAffected : childShards.values()) {
final DOMDataTreeIdentifier bindPath;
if (producerPrefix.contains(maybeAffected.getPrefix())) {
bindPath = maybeAffected.getPrefix();
}
}
- ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix,
+ final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix,
(CursorAwareDataTreeSnapshot) dataTree.takeSnapshot());
- ShardDataModificationBuilder builder = new ShardDataModificationBuilder(rootContext);
- for (SubshardProducerSpecification spec : affectedSubshards.values()) {
- ForeignShardModificationContext foreignContext =
+ final ShardDataModificationBuilder builder = new ShardDataModificationBuilder(rootContext);
+ for (final SubshardProducerSpecification spec : affectedSubshards.values()) {
+ final ForeignShardModificationContext foreignContext =
new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
builder.addSubshard(foreignContext);
builder.addSubshard(spec.getPrefix(), foreignContext);
}
- return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree);
+ return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree, shardChangePublisher, executor);
}
}