Add batching of non-isolated transaction in ShardedDOMDataTreeProducer
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InmemoryDOMDataTreeShardWriteTransaction.java
index 0a754a5615000d81968caeca851af2978ad6acc4..2ef7ebb28bbc5da9e1eb220362f5ff639b0e1c5a 100644 (file)
@@ -11,15 +11,27 @@ package org.opendaylight.mdsal.dom.store.inmemory;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Map.Entry;
 import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
+
     private enum SimpleCursorOperation {
         MERGE {
             @Override
@@ -48,9 +60,9 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         void apply(final DOMDataTreeWriteCursor cursor, final YangInstanceIdentifier path,
                 final NormalizedNode<?, ?> data) {
             int enterCount = 0;
-            Iterator<PathArgument> it = path.getPathArguments().iterator();
+            final Iterator<PathArgument> it = path.getPathArguments().iterator();
             while (it.hasNext()) {
-                PathArgument currentArg = it.next();
+                final PathArgument currentArg = it.next();
                 if (it.hasNext()) {
                     // We need to enter one level deeper, we are not at leaf (modified) node
                     cursor.enter(currentArg);
@@ -63,22 +75,35 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         }
     }
 
+    private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
+    private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
     private final ShardDataModification modification;
+    private final ListeningExecutorService executor;
+    private final DataTree rootShardDataTree;
+
+    private DataTreeModification rootModification = null;
     private DOMDataTreeWriteCursor cursor;
+    private boolean finished = false;
 
-    InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root) {
+    InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root,
+                                             final DataTree rootShardDataTree,
+                                             final InMemoryDOMDataTreeShardChangePublisher changePublisher,
+                                             final ListeningExecutorService executor) {
         this.modification = Preconditions.checkNotNull(root);
+        this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
+        this.changePublisher = Preconditions.checkNotNull(changePublisher);
+        this.executor = executor;
     }
 
     private DOMDataTreeWriteCursor getCursor() {
         if (cursor == null) {
-            cursor = new ShardDataModificationCursor(modification);
+            cursor = new ShardDataModificationCursor(modification, this);
         }
         return cursor;
     }
 
     void delete(final YangInstanceIdentifier path) {
-        YangInstanceIdentifier relativePath = toRelative(path);
+        final YangInstanceIdentifier relativePath = toRelative(path);
         Preconditions.checkArgument(!YangInstanceIdentifier.EMPTY.equals(relativePath),
                 "Deletion of shard root is not allowed");
         SimpleCursorOperation.DELETE.apply(getCursor(), relativePath , null);
@@ -93,49 +118,104 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     }
 
     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
-        Optional<YangInstanceIdentifier> relative =
+        final Optional<YangInstanceIdentifier> relative =
                 path.relativeTo(modification.getPrefix().getRootIdentifier());
         Preconditions.checkArgument(relative.isPresent());
         return relative.get();
     }
 
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
-        // FIXME: Implement this
-        return null;
+        throw new UnsupportedOperationException("Not implemented yet");
     }
 
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
-        // TODO Auto-generated method stub
-        return null;
+        throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    @Override
+    public void close() {
+        Preconditions.checkState(!finished, "Attempting to close an already finished transaction.");
+        modification.closeTransactions();
+        if (cursor != null) {
+            cursor.close();
+        }
+        finished = true;
+    }
 
-    public Object getIdentifier() {
-        // TODO Auto-generated method stub
-        return null;
+    void cursorClosed() {
+        Preconditions.checkNotNull(cursor);
+        modification.closeCursor();
+        cursor = null;
     }
 
-    public void close() {
-        // TODO Auto-generated method stub
+    public boolean isFinished() {
+        return finished;
     }
 
     @Override
     public void ready() {
+        Preconditions.checkState(!finished, "Attempting to ready an already finished transaction.");
+        Preconditions.checkState(cursor == null, "Attempting to ready a transaction that has an open cursor.");
+        Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
+
+        LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
+        rootModification = modification.seal();
+
+        cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
+                rootShardDataTree, rootModification, changePublisher));
+        for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
+                modification.getChildShards().entrySet()) {
+            cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
+        }
+        finished = true;
+    }
 
-        modification.seal();
+    @Override
+    public ListenableFuture<Void> submit() {
+        LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
 
+        Preconditions.checkNotNull(cohorts);
+        Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
 
-        return;
+        final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(
+                modification.getPrefix(), cohorts));
+
+        return submit;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> validate() {
+        LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
+
+        final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(
+                modification.getPrefix(), cohorts));
+        return submit;
     }
 
-    public void followUp() {
+    @Override
+    public ListenableFuture<Void> prepare() {
+        LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
+
+        final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(
+                modification.getPrefix(), cohorts));
+        return submit;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
 
+        final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(
+                modification.getPrefix(), cohorts));
+        return submit;
     }
 
     @Override
     public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
-        DOMDataTreeWriteCursor ret = getCursor();
-        YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
+        Preconditions.checkState(!finished, "Transaction is finished/closed already.");
+        Preconditions.checkState(cursor == null, "Previous cursor wasn't closed");
+        final DOMDataTreeWriteCursor ret = getCursor();
+        final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
         ret.enter(relativePath.getPathArguments());
         return ret;
     }