Bug 3195: Cleanup on error paths and error handling
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
index 78e059c798bbacea4f2e6b50588ede4d464bd8a0..5f9cc4a0d21768ee922b99dd70d0111b7f9508bf 100644 (file)
@@ -19,7 +19,9 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,8 +32,7 @@ import scala.util.Try;
  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
  * transaction factories.
  */
-abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory>
-        implements ShardInfoListener, AutoCloseable {
+abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
 
     protected static final AtomicLong TX_COUNTER = new AtomicLong();
@@ -61,7 +62,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
     }
 
     private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
-            String shardName, TransactionContextWrapper transactionContextAdapter) {
+            String shardName, TransactionContextWrapper transactionContextWrapper) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
                     primaryShardInfo.getPrimaryShardActor(), shardName);
@@ -71,80 +72,63 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
 
         TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
         if(localContext != null) {
-            transactionContextAdapter.executePriorTransactionOperations(localContext);
+            transactionContextWrapper.executePriorTransactionOperations(localContext);
         } else {
-            RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextAdapter,
+            RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
                     parent, shardName);
-            remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
+            remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor(), primaryShardInfo.getPrimaryShardVersion());
         }
     }
 
     private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
-            String shardName, TransactionContextWrapper transactionContextAdapter) {
+            String shardName, TransactionContextWrapper transactionContextWrapper) {
         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
 
-        transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
-                parent.getIdentifier(), parent.getLimiter()));
+        transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
+                parent.getIdentifier()));
     }
 
-    final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
-        final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier());
+    final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
+        final TransactionContextWrapper transactionContextWrapper =
+                new TransactionContextWrapper(parent.getIdentifier(), actorContext);
 
         Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
         if(findPrimaryFuture.isCompleted()) {
             Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
             if(maybe.isSuccess()) {
-                onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextAdapter);
+                onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
             } else {
-                onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextAdapter);
+                onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
             }
         } else {
             findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
                 @Override
                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
                     if (failure == null) {
-                        onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextAdapter);
+                        onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
                     } else {
-                        onFindPrimaryShardFailure(failure, parent, shardName, transactionContextAdapter);
+                        onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
                     }
                 }
             }, actorContext.getClientDispatcher());
         }
 
-        return transactionContextAdapter;
+        return transactionContextWrapper;
     }
 
     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
         final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
         if (maybeDataTree.isPresent()) {
-            knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get()));
-            LOG.debug("Shard {} resolved to local data tree", shardName);
-        }
-    }
-
-    @Override
-    public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
-        final F existing = knownLocal.get(shardName);
-        if (existing != null) {
-            if (primaryShardInfo != null) {
-                final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
-                if (maybeDataTree.isPresent()) {
-                    final DataTree newDataTree = maybeDataTree.get();
-                    final DataTree oldDataTree = dataTreeForFactory(existing);
-                    if (!oldDataTree.equals(newDataTree)) {
-                        final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree);
-                        knownLocal.replace(shardName, existing, newChain);
-                        LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree);
-                    }
+            if(!knownLocal.containsKey(shardName)) {
+                LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
 
-                    return;
-                }
-            }
-            if (knownLocal.remove(shardName, existing)) {
-                LOG.debug("Shard {} invalidated data tree {}", shardName, existing);
-            } else {
-                LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing);
+                F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
+                knownLocal.putIfAbsent(shardName, factory);
             }
+        } else if(knownLocal.containsKey(shardName)) {
+            LOG.debug("Shard {} invalidating local data tree", shardName);
+
+            knownLocal.remove(shardName);
         }
     }
 
@@ -184,14 +168,6 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
      */
     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
 
-    /**
-     * Extract the backing data tree from a particular factory.
-     *
-     * @param factory Transaction factory
-     * @return Backing data tree
-     */
-    protected abstract DataTree dataTreeForFactory(F factory);
-
     /**
      * Callback invoked from child transactions to push any futures, which need to
      * be waited for before the next transaction is allocated.
@@ -199,7 +175,51 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
      */
     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
 
-    private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
-        return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()));
+    private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
+                                                                    final TransactionProxy parent) {
+
+        switch(parent.getType()) {
+            case READ_ONLY:
+                final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        return readOnly;
+                    }
+                };
+            case READ_WRITE:
+                final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        return readWrite;
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        return readWrite;
+                    }
+                };
+            case WRITE_ONLY:
+                final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
+                return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
+                    @Override
+                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                        return writeOnly;
+                    }
+
+                    @Override
+                    protected DOMStoreReadTransaction getReadDelegate() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+             default:
+                 throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
+        }
     }
 }