Add batching of non-isolated transaction in ShardedDOMDataTreeProducer
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InmemoryDOMDataTreeShardWriteTransaction.java
index 95b28bdeb6c7f31bfe07b653841ecdb1f14601a8..2ef7ebb28bbc5da9e1eb220362f5ff639b0e1c5a 100644 (file)
@@ -13,12 +13,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
 import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
@@ -63,9 +60,9 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         void apply(final DOMDataTreeWriteCursor cursor, final YangInstanceIdentifier path,
                 final NormalizedNode<?, ?> data) {
             int enterCount = 0;
-            Iterator<PathArgument> it = path.getPathArguments().iterator();
+            final Iterator<PathArgument> it = path.getPathArguments().iterator();
             while (it.hasNext()) {
-                PathArgument currentArg = it.next();
+                final PathArgument currentArg = it.next();
                 if (it.hasNext()) {
                     // We need to enter one level deeper, we are not at leaf (modified) node
                     cursor.enter(currentArg);
@@ -78,24 +75,24 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         }
     }
 
+    private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
+    private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
     private final ShardDataModification modification;
-    private DOMDataTreeWriteCursor cursor;
-    private DataTree rootShardDataTree;
-    private DataTreeModification rootModification = null;
+    private final ListeningExecutorService executor;
+    private final DataTree rootShardDataTree;
 
-    private ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
-    private InMemoryDOMDataTreeShardChangePublisher changePublisher;
+    private DataTreeModification rootModification = null;
+    private DOMDataTreeWriteCursor cursor;
     private boolean finished = false;
 
-    // FIXME inject into shard?
-    private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
-
     InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root,
                                              final DataTree rootShardDataTree,
-                                             final InMemoryDOMDataTreeShardChangePublisher changePublisher) {
+                                             final InMemoryDOMDataTreeShardChangePublisher changePublisher,
+                                             final ListeningExecutorService executor) {
         this.modification = Preconditions.checkNotNull(root);
         this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
         this.changePublisher = Preconditions.checkNotNull(changePublisher);
+        this.executor = executor;
     }
 
     private DOMDataTreeWriteCursor getCursor() {
@@ -106,7 +103,7 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     }
 
     void delete(final YangInstanceIdentifier path) {
-        YangInstanceIdentifier relativePath = toRelative(path);
+        final YangInstanceIdentifier relativePath = toRelative(path);
         Preconditions.checkArgument(!YangInstanceIdentifier.EMPTY.equals(relativePath),
                 "Deletion of shard root is not allowed");
         SimpleCursorOperation.DELETE.apply(getCursor(), relativePath , null);
@@ -121,38 +118,33 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     }
 
     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
-        Optional<YangInstanceIdentifier> relative =
+        final Optional<YangInstanceIdentifier> relative =
                 path.relativeTo(modification.getPrefix().getRootIdentifier());
         Preconditions.checkArgument(relative.isPresent());
         return relative.get();
     }
 
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
-        // FIXME: Implement this
-        return null;
+        throw new UnsupportedOperationException("Not implemented yet");
     }
 
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-
-    public Object getIdentifier() {
-        // TODO Auto-generated method stub
-        return null;
+        throw new UnsupportedOperationException("Not implemented yet");
     }
 
     @Override
     public void close() {
         Preconditions.checkState(!finished, "Attempting to close an already finished transaction.");
         modification.closeTransactions();
-        cursor.close();
+        if (cursor != null) {
+            cursor.close();
+        }
         finished = true;
     }
 
     void cursorClosed() {
         Preconditions.checkNotNull(cursor);
+        modification.closeCursor();
         cursor = null;
     }
 
@@ -169,8 +161,10 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
         rootModification = modification.seal();
 
-        cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification, changePublisher));
-        for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry : modification.getChildShards().entrySet()) {
+        cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
+                rootShardDataTree, rootModification, changePublisher));
+        for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
+                modification.getChildShards().entrySet()) {
             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
         }
         finished = true;
@@ -183,7 +177,8 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         Preconditions.checkNotNull(cohorts);
         Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
 
-        final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(
+                modification.getPrefix(), cohorts));
 
         return submit;
     }
@@ -192,7 +187,8 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     public ListenableFuture<Boolean> validate() {
         LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
 
-        final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(
+                modification.getPrefix(), cohorts));
         return submit;
     }
 
@@ -200,7 +196,8 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     public ListenableFuture<Void> prepare() {
         LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
 
-        final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(
+                modification.getPrefix(), cohorts));
         return submit;
     }
 
@@ -208,20 +205,17 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     public ListenableFuture<Void> commit() {
         LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
 
-        final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(
+                modification.getPrefix(), cohorts));
         return submit;
     }
 
-    public void followUp() {
-
-    }
-
     @Override
     public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
         Preconditions.checkState(!finished, "Transaction is finished/closed already.");
         Preconditions.checkState(cursor == null, "Previous cursor wasn't closed");
-        DOMDataTreeWriteCursor ret = getCursor();
-        YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
+        final DOMDataTreeWriteCursor ret = getCursor();
+        final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
         ret.enter(relativePath.getPathArguments());
         return ret;
     }