Bug 4774: Wait for prior RO tx creates on tx chain
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
index 78e059c798bbacea4f2e6b50588ede4d464bd8a0..4fda059f3182ee898f7f4bc076ae7800159b3a77 100644 (file)
@@ -19,7 +19,9 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,8 +32,7 @@ import scala.util.Try;
  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
  * transaction factories.
  */
-abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory>
-        implements ShardInfoListener, AutoCloseable {
+abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
 
     protected static final AtomicLong TX_COUNTER = new AtomicLong();
@@ -54,14 +55,19 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
                         parent.getIdentifier(), shardName, local);
             }
-            return createLocalTransactionContext(local, parent);
+
+            try {
+                return createLocalTransactionContext(local, parent);
+            } catch(Exception e) {
+                return new NoOpTransactionContext(e, parent.getIdentifier());
+            }
         }
 
         return null;
     }
 
     private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
-            String shardName, TransactionContextWrapper transactionContextAdapter) {
+            String shardName, TransactionContextWrapper transactionContextWrapper) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
                     primaryShardInfo.getPrimaryShardActor(), shardName);
@@ -69,82 +75,73 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
 
         updateShardInfo(shardName, primaryShardInfo);
 
-        TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
-        if(localContext != null) {
-            transactionContextAdapter.executePriorTransactionOperations(localContext);
-        } else {
-            RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextAdapter,
-                    parent, shardName);
-            remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
+        try {
+            TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
+            if(localContext != null) {
+                transactionContextWrapper.executePriorTransactionOperations(localContext);
+            } else {
+                RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
+                        parent, shardName);
+                remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor(), primaryShardInfo.getPrimaryShardVersion());
+            }
+        } finally {
+            onTransactionContextCreated(parent.getIdentifier());
         }
     }
 
     private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
-            String shardName, TransactionContextWrapper transactionContextAdapter) {
+            String shardName, TransactionContextWrapper transactionContextWrapper) {
         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
 
-        transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
-                parent.getIdentifier(), parent.getLimiter()));
+        try {
+            transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
+                    parent.getIdentifier()));
+        } finally {
+            onTransactionContextCreated(parent.getIdentifier());
+        }
     }
 
-    final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
-        final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier());
+    final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
+        final TransactionContextWrapper transactionContextWrapper =
+                new TransactionContextWrapper(parent.getIdentifier(), actorContext);
 
-        Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
+        Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
         if(findPrimaryFuture.isCompleted()) {
             Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
             if(maybe.isSuccess()) {
-                onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextAdapter);
+                onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
             } else {
-                onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextAdapter);
+                onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
             }
         } else {
             findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
                 @Override
                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
                     if (failure == null) {
-                        onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextAdapter);
+                        onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
                     } else {
-                        onFindPrimaryShardFailure(failure, parent, shardName, transactionContextAdapter);
+                        onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
                     }
                 }
             }, actorContext.getClientDispatcher());
         }
 
-        return transactionContextAdapter;
+        return transactionContextWrapper;
     }
 
     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
         final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
         if (maybeDataTree.isPresent()) {
-            knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get()));
-            LOG.debug("Shard {} resolved to local data tree", shardName);
-        }
-    }
+            if(!knownLocal.containsKey(shardName)) {
+                LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
 
-    @Override
-    public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
-        final F existing = knownLocal.get(shardName);
-        if (existing != null) {
-            if (primaryShardInfo != null) {
-                final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
-                if (maybeDataTree.isPresent()) {
-                    final DataTree newDataTree = maybeDataTree.get();
-                    final DataTree oldDataTree = dataTreeForFactory(existing);
-                    if (!oldDataTree.equals(newDataTree)) {
-                        final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree);
-                        knownLocal.replace(shardName, existing, newChain);
-                        LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree);
-                    }
-
-                    return;
-                }
-            }
-            if (knownLocal.remove(shardName, existing)) {
-                LOG.debug("Shard {} invalidated data tree {}", shardName, existing);
-            } else {
-                LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing);
+                F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
+                knownLocal.putIfAbsent(shardName, factory);
             }
+        } else if(knownLocal.containsKey(shardName)) {
+            LOG.debug("Shard {} invalidating local data tree", shardName);
+
+            knownLocal.remove(shardName);
         }
     }
 
@@ -170,7 +167,8 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
      * @param shardName Shard name
      * @return Future containing shard information.
      */
-    protected abstract Future<PrimaryShardInfo> findPrimaryShard(String shardName);
+    protected abstract Future<PrimaryShardInfo> findPrimaryShard(@Nonnull String shardName,
+            @Nonnull TransactionIdentifier txId);
 
     /**
      * Create local transaction factory for specified shard, backed by specified shard leader
@@ -184,14 +182,6 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
      */
     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
 
-    /**
-     * Extract the backing data tree from a particular factory.
-     *
-     * @param factory Transaction factory
-     * @return Backing data tree
-     */
-    protected abstract DataTree dataTreeForFactory(F factory);
-
     /**
      * Callback invoked from child transactions to push any futures, which need to
      * be waited for before the next transaction is allocated.
@@ -199,7 +189,58 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
      */
     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
 
-    private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
-        return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()));
+    /**
+     * Callback invoked when the internal TransactionContext has been created for a transaction.
+     *
+     * @param transactionId the ID of the transaction.
+     */
+    protected abstract void onTransactionContextCreated(@Nonnull TransactionIdentifier transactionId);
+
+    private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
+                                                                    final TransactionProxy parent) {
+
+        switch(parent.getType()) {
+            case READ_ONLY:
+                final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        return readOnly;
+                    }
+                };
+            case READ_WRITE:
+                final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        return readWrite;
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        return readWrite;
+                    }
+                };
+            case WRITE_ONLY:
+                final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        return writeOnly;
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+             default:
+                 throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
+        }
     }
 }