Fix InMemory shard transaction chaining.
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InmemoryDOMDataTreeShardWriteTransaction.java
index 1aaa459b231716618b62f924788b3c03193beb20..d3265fdab3716d05d95bad6f62e43c51ca547459 100644 (file)
@@ -13,15 +13,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map.Entry;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -30,7 +30,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
 
     private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
 
@@ -77,24 +77,37 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         }
     }
 
-    private final ShardDataModification modification;
-    private DOMDataTreeWriteCursor cursor;
-    private final DataTree rootShardDataTree;
-    private DataTreeModification rootModification = null;
+    private static final AtomicLong COUNTER = new AtomicLong();
 
     private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
     private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
-    private boolean finished = false;
+    private final InMemoryDOMDataTreeShardProducer producer;
+    private final ShardDataModification modification;
+    private final ListeningExecutorService executor;
+    private final DataTree rootShardDataTree;
+    private final String identifier;
 
-    // FIXME inject into shard?
-    private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+    private DataTreeModification rootModification = null;
+    private DOMDataTreeWriteCursor cursor;
+    private boolean finished = false;
 
-    InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root,
+    InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
+                                             final ShardDataModification root,
                                              final DataTree rootShardDataTree,
-                                             final InMemoryDOMDataTreeShardChangePublisher changePublisher) {
+                                             final InMemoryDOMDataTreeShardChangePublisher changePublisher,
+                                             final ListeningExecutorService executor) {
+        this.producer = producer;
         this.modification = Preconditions.checkNotNull(root);
         this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
         this.changePublisher = Preconditions.checkNotNull(changePublisher);
+        this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
+        LOG.debug("Shard transaction{} created", identifier);
+        this.executor = executor;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
     }
 
     private DOMDataTreeWriteCursor getCursor() {
@@ -141,11 +154,13 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         if (cursor != null) {
             cursor.close();
         }
+        producer.transactionAborted(this);
         finished = true;
     }
 
     void cursorClosed() {
         Preconditions.checkNotNull(cursor);
+        modification.closeCursor();
         cursor = null;
     }
 
@@ -162,8 +177,11 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
         rootModification = modification.seal();
 
-        cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification, changePublisher));
-        for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry : modification.getChildShards().entrySet()) {
+        producer.transactionReady(this, rootModification);
+        cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
+                rootShardDataTree, rootModification, changePublisher));
+        for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
+                modification.getChildShards().entrySet()) {
             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
         }
         finished = true;
@@ -176,7 +194,8 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         Preconditions.checkNotNull(cohorts);
         Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
 
-        final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(
+                modification.getPrefix(), cohorts, this));
 
         return submit;
     }
@@ -185,7 +204,8 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     public ListenableFuture<Boolean> validate() {
         LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
 
-        final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(
+                modification.getPrefix(), cohorts));
         return submit;
     }
 
@@ -193,7 +213,8 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     public ListenableFuture<Void> prepare() {
         LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
 
-        final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(
+                modification.getPrefix(), cohorts));
         return submit;
     }
 
@@ -201,12 +222,18 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     public ListenableFuture<Void> commit() {
         LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
 
-        final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(
+                modification.getPrefix(), cohorts, this));
         return submit;
     }
 
-    public void followUp() {
+    DataTreeModification getRootModification() {
+        Preconditions.checkNotNull(rootModification, "Transaction wasn't sealed yet");
+        return rootModification;
+    }
 
+    void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
+        producer.onTransactionCommited(tx);
     }
 
     @Override