Merge "Fix warnings in config-manager"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index ffb1ab7c55064ecf7683e1857800b256c9c82a1d..d93bae22e08d9fddb3f1ac7d9c12aa4a278d0ff6 100644 (file)
@@ -21,6 +21,14 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -50,15 +58,6 @@ import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
-import javax.annotation.concurrent.GuardedBy;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
  * <p>
@@ -182,23 +181,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final TransactionType transactionType;
     private final ActorContext actorContext;
     private final TransactionIdentifier identifier;
-    private final TransactionChainProxy transactionChainProxy;
+    private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
-        this(actorContext, transactionType, null);
+        this(actorContext, transactionType, "");
     }
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            TransactionChainProxy transactionChainProxy) {
+            String transactionChainId) {
         this.actorContext = Preconditions.checkNotNull(actorContext,
             "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType,
             "transactionType should not be null");
         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
             "schemaContext should not be null");
-        this.transactionChainProxy = transactionChainProxy;
+        this.transactionChainId = transactionChainId;
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
@@ -237,6 +236,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return recordedOperationFutures;
     }
 
+    @VisibleForTesting
+    boolean hasTransactionContext() {
+        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+            if(transactionContext != null) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
@@ -433,14 +444,32 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-        if(transactionChainProxy != null){
-            transactionChainProxy.onTransactionReady(cohortFutures);
-        }
+        onTransactionReady(cohortFutures);
 
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
                 identifier.toString());
     }
 
+    /**
+     * Method for derived classes to be notified when the transaction has been readied.
+     *
+     * @param cohortFutures the cohort Futures for each shard transaction.
+     */
+    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+    }
+
+    /**
+     * Method called to send a CreateTransaction message to a shard.
+     *
+     * @param shard the shard actor to send to
+     * @param serializedCreateMessage the serialized message to send
+     * @return the response Future
+     */
+    protected Future<Object> sendCreateTransaction(ActorSelection shard,
+            Object serializedCreateMessage) {
+        return actorContext.executeOperationAsync(shard, serializedCreateMessage);
+    }
+
     @Override
     public Object getIdentifier() {
         return this.identifier;
@@ -502,10 +531,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     public String getTransactionChainId() {
-        if(transactionChainProxy == null){
-            return "";
-        }
-        return transactionChainProxy.getTransactionChainId();
+        return transactionChainId;
     }
 
     /**
@@ -591,7 +617,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
+            Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
                     new CreateTransaction(identifier.toString(),
                             TransactionProxy.this.transactionType.ordinal(),
                             getTransactionChainId()).toSerializable());
@@ -626,29 +652,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // respect to #addTxOperationOnComplete to handle timing issues and ensure no
             // TransactionOperation is missed and that they are processed in the order they occurred.
             synchronized(txOperationsOnComplete) {
+                // Store the new TransactionContext locally until we've completed invoking the
+                // TransactionOperations. This avoids thread timing issues which could cause
+                // out-of-order TransactionOperations. Eg, on a modification operation, if the
+                // TransactionContext is non-null, then we directly call the TransactionContext.
+                // However, at the same time, the code may be executing the cached
+                // TransactionOperations. So to avoid thus timing, we don't publish the
+                // TransactionContext until after we've executed all cached TransactionOperations.
+                TransactionContext localTransactionContext;
                 if(failure != null) {
                     LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
                             failure.getMessage());
 
-                    transactionContext = new NoOpTransactionContext(failure, identifier);
+                    localTransactionContext = new NoOpTransactionContext(failure, identifier);
                 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
-                    createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
+                    localTransactionContext = createValidTransactionContext(
+                            CreateTransactionReply.fromSerializable(response));
                 } else {
                     IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    transactionContext = new NoOpTransactionContext(exception, identifier);
+                    localTransactionContext = new NoOpTransactionContext(exception, identifier);
                 }
 
                 for(TransactionOperation oper: txOperationsOnComplete) {
-                    oper.invoke(transactionContext);
+                    oper.invoke(localTransactionContext);
                 }
 
                 txOperationsOnComplete.clear();
+
+                // We're done invoking the TransactionOperations so we can now publish the
+                // TransactionContext.
+                transactionContext = localTransactionContext;
             }
         }
 
-        private void createValidTransactionContext(CreateTransactionReply reply) {
+        private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
             String transactionPath = reply.getTransactionPath();
 
             LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
@@ -669,7 +708,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
 
-            transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+            return new TransactionContextImpl(transactionPath, transactionActor, identifier,
                 actorContext, schemaContext, isTxActorLocal, reply.getVersion());
         }
     }