Merge "Updating features archetype to talk about user-facing features."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 239207a60ab58cbbbefc03ee150b9cc250230d86..ebed05b6a7126170f2bba39bff4ced72bf9908a2 100644 (file)
@@ -21,6 +21,14 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -49,14 +57,6 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
 
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
@@ -181,23 +181,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final TransactionType transactionType;
     private final ActorContext actorContext;
     private final TransactionIdentifier identifier;
-    private final TransactionChainProxy transactionChainProxy;
+    private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
-        this(actorContext, transactionType, null);
+        this(actorContext, transactionType, "");
     }
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            TransactionChainProxy transactionChainProxy) {
+            String transactionChainId) {
         this.actorContext = Preconditions.checkNotNull(actorContext,
             "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType,
             "transactionType should not be null");
         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
             "schemaContext should not be null");
-        this.transactionChainProxy = transactionChainProxy;
+        this.transactionChainId = transactionChainId;
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
@@ -220,7 +220,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             phantomReferenceCache.put(cleanup, cleanup);
         }
 
-        LOG.debug("Created txn {} of type {}", identifier, transactionType);
+        LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
     }
 
     @VisibleForTesting
@@ -236,6 +236,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return recordedOperationFutures;
     }
 
+    @VisibleForTesting
+    boolean hasTransactionContext() {
+        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+            if(transactionContext != null) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
@@ -411,8 +423,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
-            LOG.debug("Tx {} Readying transaction for shard {}", identifier,
-                        txFutureCallback.getShardName());
+            LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier,
+                        txFutureCallback.getShardName(), transactionChainId);
 
             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
             if(transactionContext != null) {
@@ -432,14 +444,32 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-        if(transactionChainProxy != null){
-            transactionChainProxy.onTransactionReady(cohortFutures);
-        }
+        onTransactionReady(cohortFutures);
 
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
                 identifier.toString());
     }
 
+    /**
+     * Method for derived classes to be notified when the transaction has been readied.
+     *
+     * @param cohortFutures the cohort Futures for each shard transaction.
+     */
+    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+    }
+
+    /**
+     * Method called to send a CreateTransaction message to a shard.
+     *
+     * @param shard the shard actor to send to
+     * @param serializedCreateMessage the serialized message to send
+     * @return the response Future
+     */
+    protected Future<Object> sendCreateTransaction(ActorSelection shard,
+            Object serializedCreateMessage) {
+        return actorContext.executeOperationAsync(shard, serializedCreateMessage);
+    }
+
     @Override
     public Object getIdentifier() {
         return this.identifier;
@@ -501,10 +531,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     public String getTransactionChainId() {
-        if(transactionChainProxy == null){
-            return "";
-        }
-        return transactionChainProxy.getTransactionChainId();
+        return transactionChainId;
+    }
+
+    protected ActorContext getActorContext() {
+        return actorContext;
     }
 
     /**
@@ -590,7 +621,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
+            Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
                     new CreateTransaction(identifier.toString(),
                             TransactionProxy.this.transactionType.ordinal(),
                             getTransactionChainId()).toSerializable());
@@ -625,29 +656,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // respect to #addTxOperationOnComplete to handle timing issues and ensure no
             // TransactionOperation is missed and that they are processed in the order they occurred.
             synchronized(txOperationsOnComplete) {
+                // Store the new TransactionContext locally until we've completed invoking the
+                // TransactionOperations. This avoids thread timing issues which could cause
+                // out-of-order TransactionOperations. Eg, on a modification operation, if the
+                // TransactionContext is non-null, then we directly call the TransactionContext.
+                // However, at the same time, the code may be executing the cached
+                // TransactionOperations. So to avoid thus timing, we don't publish the
+                // TransactionContext until after we've executed all cached TransactionOperations.
+                TransactionContext localTransactionContext;
                 if(failure != null) {
                     LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
                             failure.getMessage());
 
-                    transactionContext = new NoOpTransactionContext(failure, identifier);
+                    localTransactionContext = new NoOpTransactionContext(failure, identifier);
                 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
-                    createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
+                    localTransactionContext = createValidTransactionContext(
+                            CreateTransactionReply.fromSerializable(response));
                 } else {
                     IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    transactionContext = new NoOpTransactionContext(exception, identifier);
+                    localTransactionContext = new NoOpTransactionContext(exception, identifier);
                 }
 
                 for(TransactionOperation oper: txOperationsOnComplete) {
-                    oper.invoke(transactionContext);
+                    oper.invoke(localTransactionContext);
                 }
 
                 txOperationsOnComplete.clear();
+
+                // We're done invoking the TransactionOperations so we can now publish the
+                // TransactionContext.
+                transactionContext = localTransactionContext;
             }
         }
 
-        private void createValidTransactionContext(CreateTransactionReply reply) {
+        private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
             String transactionPath = reply.getTransactionPath();
 
             LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
@@ -666,10 +710,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
             // TxActor is always created where the leader of the shard is.
             // Check if TxActor is created in the same node
-            boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
+            boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
-            transactionContext = new TransactionContextImpl(transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal);
+            return new TransactionContextImpl(transactionPath, transactionActor, identifier,
+                actorContext, schemaContext, isTxActorLocal, reply.getVersion());
         }
     }
 
@@ -712,17 +756,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         private final ActorContext actorContext;
         private final SchemaContext schemaContext;
+        private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
+        private final int remoteTransactionVersion;
 
-        private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+        private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal) {
+                boolean isTxActorLocal, int remoteTransactionVersion) {
             super(identifier);
+            this.transactionPath = transactionPath;
             this.actor = actor;
             this.actorContext = actorContext;
             this.schemaContext = schemaContext;
             this.isTxActorLocal = isTxActorLocal;
+            this.remoteTransactionVersion = remoteTransactionVersion;
         }
 
         private ActorSelection getActor() {
@@ -783,7 +831,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                     } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                        return actorContext.actorSelection(reply.getCohortPath());
+                        String cohortPath = reply.getCohortPath();
+
+                        // In Helium we used to return the local path of the actor which represented
+                        // a remote ThreePhaseCommitCohort. The local path would then be converted to
+                        // a remote path using this resolvePath method. To maintain compatibility with
+                        // a Helium node we need to continue to do this conversion.
+                        // At some point in the future when upgrades from Helium are not supported
+                        // we could remove this code to resolvePath and just use the cohortPath as the
+                        // resolved cohortPath
+                        if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+                            cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
+                        }
+
+                        return actorContext.actorSelection(cohortPath);
 
                     } else {
                         // Throwing an exception here will fail the Future.