BUG-2138: Add shard DTO classes to spi
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InMemoryDOMDataTreeShard.java
index 21ead91ae91b392f925584d9d61bf1340057cd65..6449653ad7207b9da94067befae64a404c47f7f2 100644 (file)
@@ -11,121 +11,180 @@ package org.opendaylight.mdsal.dom.store.inmemory;
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import javax.annotation.Nonnull;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
+import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
+import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard;
+import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification;
+import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
+import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Beta
 public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener {
 
-    private static final class SubshardProducerSpecification {
-        private final Collection<DOMDataTreeIdentifier> prefixes = new ArrayList<>(1);
-        private final ChildShardContext shard;
-        SubshardProducerSpecification(final ChildShardContext subshard) {
-            this.shard = Preconditions.checkNotNull(subshard);
-        }
-
-        void addPrefix(final DOMDataTreeIdentifier prefix) {
-            prefixes.add(prefix);
-        }
-
-        DOMDataTreeShardProducer createProducer() {
-            return shard.getShard().createProducer(prefixes);
-        }
-
-        public DOMDataTreeIdentifier getPrefix() {
-            return shard.getPrefix();
-        }
-    }
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class);
+    private static final int DEFAULT_SUBMIT_QUEUE_SIZE = 1000;
 
     private final DOMDataTreePrefixTable<ChildShardContext> childShardsTable = DOMDataTreePrefixTable.create();
     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
-    private final DOMDataTreeIdentifier prefix;
-    private final DataTree dataTree;
+    private final Collection<InMemoryDOMDataTreeShardProducer> producers = new HashSet<>();
     private final InMemoryDOMDataTreeShardChangePublisher shardChangePublisher;
     private final ListeningExecutorService executor;
+    private final DOMDataTreeIdentifier prefix;
+    private final DataTree dataTree;
 
-    private SchemaContext schemaContext;
-
-    private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor,
-                                     final int maxDataChangeListenerQueueSize, final int maxCommitQueueSize) {
+    private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final Executor dataTreeChangeExecutor,
+                                     final int maxDataChangeListenerQueueSize, final int submitQueueSize) {
         this.prefix = Preconditions.checkNotNull(prefix);
 
         final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
         this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier());
 
-        this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor, maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
-        this.executor = MoreExecutors.listeningDecorator(SpecialExecutors.newBoundedSingleThreadExecutor(maxCommitQueueSize, "Shard-executor[" + prefix + "]"));
+        this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor,
+                maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
+
+        final FastThreadPoolExecutor fte = new FastThreadPoolExecutor(1, submitQueueSize, "Shard[" + prefix + "]");
+        fte.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerWaitsPolicy());
+        this.executor = MoreExecutors.listeningDecorator(fte);
+    }
+
+    public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
+                                                  final Executor dataTreeChangeExecutor,
+                                                  final int maxDataChangeListenerQueueSize) {
+        return new InMemoryDOMDataTreeShard(id.toOptimized(), dataTreeChangeExecutor,
+                maxDataChangeListenerQueueSize, DEFAULT_SUBMIT_QUEUE_SIZE);
     }
 
-    public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id, final ExecutorService dataTreeChangeExecutor,
-                                                  final int maxDataChangeListenerQueueSize, final int maxCommitQueueSize) {
-        return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize, maxCommitQueueSize);
+    public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
+                                                  final Executor dataTreeChangeExecutor,
+                                                  final int maxDataChangeListenerQueueSize,
+                                                  final int submitQueueSize) {
+        return new InMemoryDOMDataTreeShard(id.toOptimized(), dataTreeChangeExecutor,
+                maxDataChangeListenerQueueSize, submitQueueSize);
     }
 
     @Override
     public void onGlobalContextUpdated(final SchemaContext context) {
         dataTree.setSchemaContext(context);
-        schemaContext = context;
     }
 
     @Override
     public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
         Preconditions.checkArgument(child != this, "Attempted to attach child %s onto self", this);
         reparentChildShards(prefix, child);
-        addChildShard(prefix, child);
+
+        final ChildShardContext context = createContextFor(prefix, child);
+        childShards.put(prefix, context);
+        childShardsTable.store(prefix, context);
+        updateProducers();
     }
 
     @Override
     public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
         childShards.remove(prefix);
         childShardsTable.remove(prefix);
+        updateProducers();
+    }
+
+    private void updateProducers() {
+        for (InMemoryDOMDataTreeShardProducer p : producers) {
+            p.setModificationFactory(createModificationFactory(p.getPrefixes()));
+        }
+    }
+
+    @VisibleForTesting
+    InMemoryShardDataModificationFactory createModificationFactory(final Collection<DOMDataTreeIdentifier> prefixes) {
+        final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affected = new HashMap<>();
+        for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
+            for (final ChildShardContext child : childShards.values()) {
+                final DOMDataTreeIdentifier bindPath;
+                if (producerPrefix.contains(child.getPrefix())) {
+                    bindPath = child.getPrefix();
+                } else if (child.getPrefix().contains(producerPrefix)) {
+                    // Bound path is inside subshard
+                    bindPath = producerPrefix;
+                } else {
+                    continue;
+                }
+
+                SubshardProducerSpecification spec = affected.get(child.getPrefix());
+                if (spec == null) {
+                    spec = new SubshardProducerSpecification(child);
+                    affected.put(child.getPrefix(), spec);
+                }
+                spec.addPrefix(bindPath);
+            }
+        }
+
+        final InmemoryShardDataModificationFactoryBuilder builder =
+                new InmemoryShardDataModificationFactoryBuilder(prefix);
+        for (final SubshardProducerSpecification spec : affected.values()) {
+            final ForeignShardModificationContext foreignContext =
+                    new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
+            builder.addSubshard(foreignContext);
+            builder.addSubshard(spec.getPrefix(), foreignContext);
+        }
+
+        return builder.build();
     }
 
     @Override
     public InMemoryDOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> prefixes) {
         for (final DOMDataTreeIdentifier prodPrefix : prefixes) {
             Preconditions.checkArgument(prefix.contains(prodPrefix), "Prefix %s is not contained under shart root",
-                prodPrefix, prefix);
+                    prodPrefix, prefix);
+        }
+
+        final InMemoryDOMDataTreeShardProducer ret = new InMemoryDOMDataTreeShardProducer(this, prefixes,
+                createModificationFactory(prefixes));
+        producers.add(ret);
+        return ret;
+    }
+
+    void closeProducer(final InMemoryDOMDataTreeShardProducer producer) {
+        if (!producers.remove(producer)) {
+            LOG.warn("Producer {} not found in shard {}", producer, this);
         }
-        return new InMemoryDOMDataTreeShardProducer(this, prefixes);
     }
 
     @Nonnull
     @Override
-    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(@Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+            @Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
         return shardChangePublisher.registerTreeChangeListener(treeId, listener);
     }
 
-    private void addChildShard(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
-        final ChildShardContext context = createContextFor(prefix, child);
-        childShards.put(prefix, context);
-        childShardsTable.store(prefix, context);
-    }
-
     private void reparentChildShards(final DOMDataTreeIdentifier newChildPrefix, final DOMDataTreeShard newChild) {
-        final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren = childShards.entrySet().iterator();
+        final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren =
+                childShards.entrySet().iterator();
         final Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
         while (actualChildren.hasNext()) {
             final Entry<DOMDataTreeIdentifier, ChildShardContext> actualChild = actualChildren.next();
@@ -152,9 +211,10 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
         throw new UnsupportedOperationException();
     }
 
-    private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+    private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix,
+                                                      final DOMDataTreeShard child) {
         Preconditions.checkArgument(child instanceof WriteableDOMDataTreeShard,
-            "Child %s is not a writable shared", child);
+                "Child %s is not a writable shared", child);
         return new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child);
     }
 
@@ -171,52 +231,19 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
 
     @VisibleForTesting
     Map<DOMDataTreeIdentifier, DOMDataTreeShard> getChildShards() {
-        final Map<DOMDataTreeIdentifier, DOMDataTreeShard> ret = new HashMap<>();
-        for (final Entry<DOMDataTreeIdentifier, ChildShardContext> entry : childShards.entrySet()) {
-            ret.put(entry.getKey(), entry.getValue().getShard());
-        }
-        return ret;
+        return ImmutableMap.copyOf(Maps.transformValues(childShards, ChildShardContext::getShard));
     }
 
-    InmemoryDOMDataTreeShardWriteTransaction createTransaction(final InmemoryDOMDataTreeShardWriteTransaction previousTx) {
-        // FIXME: implement this
-        throw new UnsupportedOperationException();
+    DataTreeSnapshot takeSnapshot() {
+        return dataTree.takeSnapshot();
     }
 
-    InmemoryDOMDataTreeShardWriteTransaction createTransaction(final Collection<DOMDataTreeIdentifier> prefixes) {
-
-        final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
-        for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
-            for (final ChildShardContext maybeAffected : childShards.values()) {
-                final DOMDataTreeIdentifier bindPath;
-                if (producerPrefix.contains(maybeAffected.getPrefix())) {
-                    bindPath = maybeAffected.getPrefix();
-                } else if (maybeAffected.getPrefix().contains(producerPrefix)) {
-                    // Bound path is inside subshard
-                    bindPath = producerPrefix;
-                } else {
-                    continue;
-                }
-
-                SubshardProducerSpecification spec = affectedSubshards.get(maybeAffected.getPrefix());
-                if (spec == null) {
-                    spec = new SubshardProducerSpecification(maybeAffected);
-                    affectedSubshards.put(maybeAffected.getPrefix(), spec);
-                }
-                spec.addPrefix(bindPath);
-            }
-        }
-
-        final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix,
-                (CursorAwareDataTreeSnapshot) dataTree.takeSnapshot());
-        final ShardDataModificationBuilder builder = new ShardDataModificationBuilder(rootContext);
-        for (final SubshardProducerSpecification spec : affectedSubshards.values()) {
-            final ForeignShardModificationContext foreignContext =
-                    new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
-            builder.addSubshard(foreignContext);
-            builder.addSubshard(spec.getPrefix(), foreignContext);
-        }
+    InmemoryDOMDataTreeShardWriteTransaction createTransaction(final String transactionId,
+            final InMemoryDOMDataTreeShardProducer producer, final DataTreeSnapshot snapshot) {
+        Preconditions.checkArgument(snapshot instanceof CursorAwareDataTreeSnapshot);
 
-        return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree, shardChangePublisher, executor);
+        return new InmemoryDOMDataTreeShardWriteTransaction(producer,
+                producer.getModificationFactory().createModification((CursorAwareDataTreeSnapshot) snapshot), dataTree,
+                shardChangePublisher, executor);
     }
 }