Fix InMemory shard transaction chaining.
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InMemoryDOMDataTreeShard.java
index 575c95d62a6751285900c32adbb9766b385f097b..98ad468a4bb6bfc14c69fbd7105324f17b08a9f1 100644 (file)
@@ -20,7 +20,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import javax.annotation.Nonnull;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
@@ -28,9 +27,12 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
+import org.opendaylight.yangtools.util.concurrent.FastThreadPoolExecutor;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -39,6 +41,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 @Beta
 public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeShard, SchemaContextListener {
 
+    private static final int DEFAULT_SUBMIT_QUEUE_SIZE = 1000;
+
     private static final class SubshardProducerSpecification {
         private final Collection<DOMDataTreeIdentifier> prefixes = new ArrayList<>(1);
         private final ChildShardContext shard;
@@ -68,7 +72,7 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
     private final ListeningExecutorService executor;
 
     private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor,
-                                     final int maxDataChangeListenerQueueSize) {
+                                     final int maxDataChangeListenerQueueSize, final int submitQueueSize) {
         this.prefix = Preconditions.checkNotNull(prefix);
 
         final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
@@ -76,12 +80,25 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
 
         this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor,
                 maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
-        this.executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+
+        final FastThreadPoolExecutor fte = new FastThreadPoolExecutor(1, submitQueueSize, "Shard[" + prefix + "]");
+        fte.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerWaitsPolicy());
+        this.executor = MoreExecutors.listeningDecorator(fte);
+    }
+
+    public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
+                                                  final ExecutorService dataTreeChangeExecutor,
+                                                  final int maxDataChangeListenerQueueSize) {
+        return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor,
+                maxDataChangeListenerQueueSize, DEFAULT_SUBMIT_QUEUE_SIZE);
     }
 
     public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id,
-            final ExecutorService dataTreeChangeExecutor, final int maxDataChangeListenerQueueSize) {
-        return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize);
+                                                  final ExecutorService dataTreeChangeExecutor,
+                                                  final int maxDataChangeListenerQueueSize,
+                                                  final int submitQueueSize) {
+        return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor,
+                maxDataChangeListenerQueueSize, submitQueueSize);
     }
 
     @Override
@@ -180,13 +197,22 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
         return ret;
     }
 
-    InmemoryDOMDataTreeShardWriteTransaction createTransaction(
-            final InmemoryDOMDataTreeShardWriteTransaction previousTx) {
-        // FIXME: implement this
-        throw new UnsupportedOperationException();
+    DataTreeSnapshot takeSnapshot() {
+        return dataTree.takeSnapshot();
     }
 
-    InmemoryDOMDataTreeShardWriteTransaction createTransaction(final Collection<DOMDataTreeIdentifier> prefixes) {
+    InmemoryDOMDataTreeShardWriteTransaction createTransaction(final String transactionId,
+                                                               final InMemoryDOMDataTreeShardProducer producer,
+                                                               final Collection<DOMDataTreeIdentifier> prefixes,
+                                                               final DataTreeSnapshot snapshot) {
+
+        return createTxForSnapshot(producer, prefixes, (CursorAwareDataTreeSnapshot) snapshot);
+    }
+
+    private InmemoryDOMDataTreeShardWriteTransaction createTxForSnapshot(
+            final InMemoryDOMDataTreeShardProducer producer,
+            final Collection<DOMDataTreeIdentifier> prefixes,
+            final CursorAwareDataTreeSnapshot snapshot) {
 
         final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
         for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
@@ -210,8 +236,7 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
             }
         }
 
-        final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix,
-                (CursorAwareDataTreeSnapshot) dataTree.takeSnapshot());
+        final ShardRootModificationContext rootContext = new ShardRootModificationContext(prefix, snapshot);
         final ShardDataModificationBuilder builder = new ShardDataModificationBuilder(rootContext);
         for (final SubshardProducerSpecification spec : affectedSubshards.values()) {
             final ForeignShardModificationContext foreignContext =
@@ -220,6 +245,8 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
             builder.addSubshard(spec.getPrefix(), foreignContext);
         }
 
-        return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree, shardChangePublisher, executor);
+        return new InmemoryDOMDataTreeShardWriteTransaction(producer, builder.build(),
+                dataTree, shardChangePublisher, executor);
     }
+
 }