Bug 2318: Ensure previous Tx in chain is readied before creating the next
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index ffb1ab7c55064ecf7683e1857800b256c9c82a1d..40880d907580bbaec6a81756d07b3b8b37210ade 100644 (file)
@@ -21,6 +21,14 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -50,15 +58,6 @@ import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
-import javax.annotation.concurrent.GuardedBy;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
  * <p>
@@ -182,23 +181,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final TransactionType transactionType;
     private final ActorContext actorContext;
     private final TransactionIdentifier identifier;
-    private final TransactionChainProxy transactionChainProxy;
+    private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
-        this(actorContext, transactionType, null);
+        this(actorContext, transactionType, "");
     }
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            TransactionChainProxy transactionChainProxy) {
+            String transactionChainId) {
         this.actorContext = Preconditions.checkNotNull(actorContext,
             "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType,
             "transactionType should not be null");
         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
             "schemaContext should not be null");
-        this.transactionChainProxy = transactionChainProxy;
+        this.transactionChainId = transactionChainId;
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
@@ -433,14 +432,32 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-        if(transactionChainProxy != null){
-            transactionChainProxy.onTransactionReady(cohortFutures);
-        }
+        onTransactionReady(cohortFutures);
 
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
                 identifier.toString());
     }
 
+    /**
+     * Method for derived classes to be notified when the transaction has been readied.
+     *
+     * @param cohortFutures the cohort Futures for each shard transaction.
+     */
+    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+    }
+
+    /**
+     * Method called to send a CreateTransaction message to a shard.
+     *
+     * @param shard the shard actor to send to
+     * @param serializedCreateMessage the serialized message to send
+     * @return the response Future
+     */
+    protected Future<Object> sendCreateTransaction(ActorSelection shard,
+            Object serializedCreateMessage) {
+        return actorContext.executeOperationAsync(shard, serializedCreateMessage);
+    }
+
     @Override
     public Object getIdentifier() {
         return this.identifier;
@@ -502,10 +519,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     public String getTransactionChainId() {
-        if(transactionChainProxy == null){
-            return "";
-        }
-        return transactionChainProxy.getTransactionChainId();
+        return transactionChainId;
     }
 
     /**
@@ -591,7 +605,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
+            Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
                     new CreateTransaction(identifier.toString(),
                             TransactionProxy.this.transactionType.ordinal(),
                             getTransactionChainId()).toSerializable());