Fix InMemory shard transaction chaining.
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InmemoryDOMDataTreeShardWriteTransaction.java
index 09e07b52776d1d54db367779cc7ed0caa817721d..d3265fdab3716d05d95bad6f62e43c51ca547459 100644 (file)
@@ -13,16 +13,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -31,7 +30,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
 
     private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
 
@@ -63,9 +62,9 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         void apply(final DOMDataTreeWriteCursor cursor, final YangInstanceIdentifier path,
                 final NormalizedNode<?, ?> data) {
             int enterCount = 0;
-            Iterator<PathArgument> it = path.getPathArguments().iterator();
+            final Iterator<PathArgument> it = path.getPathArguments().iterator();
             while (it.hasNext()) {
-                PathArgument currentArg = it.next();
+                final PathArgument currentArg = it.next();
                 if (it.hasNext()) {
                     // We need to enter one level deeper, we are not at leaf (modified) node
                     cursor.enter(currentArg);
@@ -78,34 +77,48 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         }
     }
 
-    private final ShardDataModification modification;
-    private DOMDataTreeWriteCursor cursor;
-    private DataTree rootShardDataTree;
-    private DataTreeModification rootModification = null;
+    private static final AtomicLong COUNTER = new AtomicLong();
 
-    private ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
-    private InMemoryDOMDataTreeShardChangePublisher changePublisher;
+    private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
+    private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
+    private final InMemoryDOMDataTreeShardProducer producer;
+    private final ShardDataModification modification;
+    private final ListeningExecutorService executor;
+    private final DataTree rootShardDataTree;
+    private final String identifier;
 
-    // FIXME inject into shard?
-    private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+    private DataTreeModification rootModification = null;
+    private DOMDataTreeWriteCursor cursor;
+    private boolean finished = false;
 
-    InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root,
+    InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
+                                             final ShardDataModification root,
                                              final DataTree rootShardDataTree,
-                                             final InMemoryDOMDataTreeShardChangePublisher changePublisher) {
+                                             final InMemoryDOMDataTreeShardChangePublisher changePublisher,
+                                             final ListeningExecutorService executor) {
+        this.producer = producer;
         this.modification = Preconditions.checkNotNull(root);
         this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
         this.changePublisher = Preconditions.checkNotNull(changePublisher);
+        this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
+        LOG.debug("Shard transaction{} created", identifier);
+        this.executor = executor;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
     }
 
     private DOMDataTreeWriteCursor getCursor() {
         if (cursor == null) {
-            cursor = new ShardDataModificationCursor(modification);
+            cursor = new ShardDataModificationCursor(modification, this);
         }
         return cursor;
     }
 
     void delete(final YangInstanceIdentifier path) {
-        YangInstanceIdentifier relativePath = toRelative(path);
+        final YangInstanceIdentifier relativePath = toRelative(path);
         Preconditions.checkArgument(!YangInstanceIdentifier.EMPTY.equals(relativePath),
                 "Deletion of shard root is not allowed");
         SimpleCursorOperation.DELETE.apply(getCursor(), relativePath , null);
@@ -120,42 +133,58 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     }
 
     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
-        Optional<YangInstanceIdentifier> relative =
+        final Optional<YangInstanceIdentifier> relative =
                 path.relativeTo(modification.getPrefix().getRootIdentifier());
         Preconditions.checkArgument(relative.isPresent());
         return relative.get();
     }
 
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
-        // FIXME: Implement this
-        return null;
+        throw new UnsupportedOperationException("Not implemented yet");
     }
 
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
-        // TODO Auto-generated method stub
-        return null;
+        throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    @Override
+    public void close() {
+        Preconditions.checkState(!finished, "Attempting to close an already finished transaction.");
+        modification.closeTransactions();
+        if (cursor != null) {
+            cursor.close();
+        }
+        producer.transactionAborted(this);
+        finished = true;
+    }
 
-    public Object getIdentifier() {
-        // TODO Auto-generated method stub
-        return null;
+    void cursorClosed() {
+        Preconditions.checkNotNull(cursor);
+        modification.closeCursor();
+        cursor = null;
     }
 
-    public void close() {
-        // TODO Auto-generated method stub
+    public boolean isFinished() {
+        return finished;
     }
 
     @Override
     public void ready() {
+        Preconditions.checkState(!finished, "Attempting to ready an already finished transaction.");
+        Preconditions.checkState(cursor == null, "Attempting to ready a transaction that has an open cursor.");
+        Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
 
         LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
         rootModification = modification.seal();
 
-        cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification, changePublisher));
-        for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry : modification.getChildShards().entrySet()) {
+        producer.transactionReady(this, rootModification);
+        cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
+                rootShardDataTree, rootModification, changePublisher));
+        for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
+                modification.getChildShards().entrySet()) {
             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
         }
+        finished = true;
     }
 
     @Override
@@ -163,9 +192,10 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
 
         Preconditions.checkNotNull(cohorts);
-        Preconditions.checkState(!cohorts.isEmpty(), "Submitting an empty transaction");
+        Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
 
-        final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(
+                modification.getPrefix(), cohorts, this));
 
         return submit;
     }
@@ -174,7 +204,8 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     public ListenableFuture<Boolean> validate() {
         LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
 
-        final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(
+                modification.getPrefix(), cohorts));
         return submit;
     }
 
@@ -182,7 +213,8 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     public ListenableFuture<Void> prepare() {
         LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
 
-        final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(
+                modification.getPrefix(), cohorts));
         return submit;
     }
 
@@ -190,18 +222,26 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     public ListenableFuture<Void> commit() {
         LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
 
-        final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts));
+        final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(
+                modification.getPrefix(), cohorts, this));
         return submit;
     }
 
-    public void followUp() {
+    DataTreeModification getRootModification() {
+        Preconditions.checkNotNull(rootModification, "Transaction wasn't sealed yet");
+        return rootModification;
+    }
 
+    void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
+        producer.onTransactionCommited(tx);
     }
 
     @Override
     public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
-        DOMDataTreeWriteCursor ret = getCursor();
-        YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
+        Preconditions.checkState(!finished, "Transaction is finished/closed already.");
+        Preconditions.checkState(cursor == null, "Previous cursor wasn't closed");
+        final DOMDataTreeWriteCursor ret = getCursor();
+        final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
         ret.enter(relativePath.getPathArguments());
         return ret;
     }