Bug 4202: submit shard transactions
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InmemoryDOMDataTreeShardWriteTransaction.java
index 0a754a5615000d81968caeca851af2978ad6acc4..4634d4f70498fb018bf1bb3db4c764b7b78d3527 100644 (file)
@@ -11,15 +11,30 @@ package org.opendaylight.mdsal.dom.store.inmemory;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
 import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
+
     private enum SimpleCursorOperation {
         MERGE {
             @Override
@@ -63,11 +78,20 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         }
     }
 
+    private InMemoryDOMDataTreeShardThreePhaseCommitCohort commitCohort;
     private final ShardDataModification modification;
     private DOMDataTreeWriteCursor cursor;
+    private DataTree rootShardDataTree;
+    private DataTreeModification rootModification = null;
+
+    private ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
 
-    InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root) {
+    // FIXME inject into shard?
+    private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+
+    InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root, final DataTree rootShardDataTree) {
         this.modification = Preconditions.checkNotNull(root);
+        this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
     }
 
     private DOMDataTreeWriteCursor getCursor() {
@@ -122,10 +146,49 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     @Override
     public void ready() {
 
-        modification.seal();
+        LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
+        rootModification = modification.seal();
+
+        cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification));
+        for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry : modification.getChildShards().entrySet()) {
+            cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> submit() {
+        LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
+
+        Preconditions.checkNotNull(cohorts);
+        Preconditions.checkState(!cohorts.isEmpty(), "Submitting an empty transaction");
+
+        final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts));
 
+        return submit;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> validate() {
+        LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
+
+        final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
+        return submit;
+    }
+
+    @Override
+    public ListenableFuture<Void> prepare() {
+        LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
+
+        final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
+        return submit;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
 
-        return;
+        final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts));
+        return submit;
     }
 
     public void followUp() {