Use a bounded blocking queue in InmemoryDOMDataTreeShards.
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InMemoryDOMDataTreeShard.java
index f518346daad47f84cf1ac26bcc1e3d05fcdc76f1..031d9b297cc739a465e1d6f2e8e3b11eb6e70157 100644 (file)
@@ -20,7 +20,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import javax.annotation.Nonnull;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
@@ -28,6 +27,8 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
+import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -39,9 +40,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 @Beta
 public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener {
 
+    private static final int DEFAULT_SUBMIT_QUEUE_SIZE = 1000;
+
     private static final class SubshardProducerSpecification {
         private final Collection<DOMDataTreeIdentifier> prefixes = new ArrayList<>(1);
         private final ChildShardContext shard;
+
         SubshardProducerSpecification(final ChildShardContext subshard) {
             this.shard = Preconditions.checkNotNull(subshard);
         }
@@ -67,19 +71,33 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
     private final ListeningExecutorService executor;
 
     private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor,
-                                     final int maxDataChangeListenerQueueSize) {
+                                     final int maxDataChangeListenerQueueSize, final int submitQueueSize) {
         this.prefix = Preconditions.checkNotNull(prefix);
 
         final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
         this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier());
 
-        this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor, maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
-        this.executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+        this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor,
+                maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
+
+        final FastThreadPoolExecutor fte = new FastThreadPoolExecutor(1, submitQueueSize, "Shard[" + prefix + "]");
+        fte.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerWaitsPolicy());
+        this.executor = MoreExecutors.listeningDecorator(fte);
     }
 
-    public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id, final ExecutorService dataTreeChangeExecutor,
+    public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
+                                                  final ExecutorService dataTreeChangeExecutor,
                                                   final int maxDataChangeListenerQueueSize) {
-        return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize);
+        return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor,
+                maxDataChangeListenerQueueSize, DEFAULT_SUBMIT_QUEUE_SIZE);
+    }
+
+    public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
+                                                  final ExecutorService dataTreeChangeExecutor,
+                                                  final int maxDataChangeListenerQueueSize,
+                                                  final int submitQueueSize) {
+        return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor,
+                maxDataChangeListenerQueueSize, submitQueueSize);
     }
 
     @Override
@@ -111,7 +129,8 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
 
     @Nonnull
     @Override
-    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(@Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+            @Nonnull final YangInstanceIdentifier treeId, @Nonnull final L listener) {
         return shardChangePublisher.registerTreeChangeListener(treeId, listener);
     }
 
@@ -122,7 +141,8 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
     }
 
     private void reparentChildShards(final DOMDataTreeIdentifier newChildPrefix, final DOMDataTreeShard newChild) {
-        final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren = childShards.entrySet().iterator();
+        final Iterator<Entry<DOMDataTreeIdentifier, ChildShardContext>> actualChildren =
+                childShards.entrySet().iterator();
         final Map<DOMDataTreeIdentifier, ChildShardContext> reparented = new HashMap<>();
         while (actualChildren.hasNext()) {
             final Entry<DOMDataTreeIdentifier, ChildShardContext> actualChild = actualChildren.next();
@@ -149,7 +169,8 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
         throw new UnsupportedOperationException();
     }
 
-    private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+    private static ChildShardContext createContextFor(final DOMDataTreeIdentifier prefix,
+            final DOMDataTreeShard child) {
         Preconditions.checkArgument(child instanceof WriteableDOMDataTreeShard,
             "Child %s is not a writable shared", child);
         return new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child);
@@ -175,7 +196,8 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
         return ret;
     }
 
-    InmemoryDOMDataTreeShardWriteTransaction createTransaction(final InmemoryDOMDataTreeShardWriteTransaction previousTx) {
+    InmemoryDOMDataTreeShardWriteTransaction createTransaction(
+            final InmemoryDOMDataTreeShardWriteTransaction previousTx) {
         // FIXME: implement this
         throw new UnsupportedOperationException();
     }