CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RemoteTransactionContextSupport.java
@@ -1,4 +1,5 @@
 /*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -10,13 +11,9 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -28,46 +25,40 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
- * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
- * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
- * retry task after a short delay.
+ * Handles creation of TransactionContext instances for remote transactions. This class creates
+ * remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit,
+ * if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay.
  * <p>
  * The end result from a completed CreateTransaction message is a TransactionContext that is
  * used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cache and executed once the CreateTransaction completes,
- * successfully or not.
+ * CreateTransaction completes are cache via a TransactionContextWrapper and executed once the
+ * CreateTransaction completes, successfully or not.
  */
-final class TransactionFutureCallback extends OnComplete<Object> {
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionFutureCallback.class);
+final class RemoteTransactionContextSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class);
 
     /**
      * Time interval in between transaction create retries.
      */
     private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS);
 
-    /**
-     * The list of transaction operations to execute once the CreateTransaction completes.
-     */
-    @GuardedBy("txOperationsOnComplete")
-    private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
-    private final TransactionProxy proxy;
+    private final TransactionProxy parent;
     private final String shardName;
 
-    /**
-     * The TransactionContext resulting from the CreateTransaction reply.
-     */
-    private volatile TransactionContext transactionContext;
-
     /**
      * The target primary shard.
      */
     private volatile ActorSelection primaryShard;
     private volatile int createTxTries;
 
-    TransactionFutureCallback(final TransactionProxy proxy, final String shardName) {
-        this.proxy = Preconditions.checkNotNull(proxy);
+    private final TransactionContextWrapper transactionContextAdapter;
+
+    RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextAdapter, final TransactionProxy parent,
+            final String shardName) {
+        this.parent = Preconditions.checkNotNull(parent);
         this.shardName = shardName;
-        createTxTries = (int) (proxy.getActorContext().getDatastoreContext().
+        this.transactionContextAdapter = transactionContextAdapter;
+        createTxTries = (int) (parent.getActorContext().getDatastoreContext().
                 getShardLeaderElectionTimeout().duration().toMillis() /
                 CREATE_TX_TRY_INTERVAL.toMillis());
     }
@@ -76,24 +67,20 @@ final class TransactionFutureCallback extends OnComplete<Object> {
         return shardName;
     }
 
-    TransactionContext getTransactionContext() {
-        return transactionContext;
-    }
-
     private TransactionType getTransactionType() {
-        return proxy.getTransactionType();
-    }
-
-    private TransactionIdentifier getIdentifier() {
-        return proxy.getIdentifier();
+        return parent.getType();
     }
 
     private ActorContext getActorContext() {
-        return proxy.getActorContext();
+        return parent.getActorContext();
     }
 
     private Semaphore getOperationLimiter() {
-        return proxy.getOperationLimiter();
+        return parent.getLimiter();
+    }
+
+    private TransactionIdentifier getIdentifier() {
+        return parent.getIdentifier();
     }
 
     /**
@@ -110,43 +97,13 @@ final class TransactionFutureCallback extends OnComplete<Object> {
             // For write-only Tx's we prepare the transaction modifications directly on the shard actor
             // to avoid the overhead of creating a separate transaction actor.
             // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
-            executeTxOperatonsOnComplete(proxy.createValidTransactionContext(this.primaryShard,
+            transactionContextAdapter.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard,
                     this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
         } else {
             tryCreateTransaction();
         }
     }
 
-    /**
-     * Adds a TransactionOperation to be executed after the CreateTransaction completes.
-     */
-    private void addTxOperationOnComplete(TransactionOperation operation) {
-        boolean invokeOperation = true;
-        synchronized(txOperationsOnComplete) {
-            if(transactionContext == null) {
-                LOG.debug("Tx {} Adding operation on complete", getIdentifier());
-
-                invokeOperation = false;
-                txOperationsOnComplete.add(operation);
-            }
-        }
-
-        if(invokeOperation) {
-            operation.invoke(transactionContext);
-        }
-    }
-
-    void enqueueTransactionOperation(final TransactionOperation op) {
-
-        if (transactionContext != null) {
-            op.invoke(transactionContext);
-        } else {
-            // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-            // callback to be executed after the Tx is created.
-            addTxOperationOnComplete(op);
-        }
-    }
-
     /**
      * Performs a CreateTransaction try async.
      */
@@ -156,15 +113,19 @@ final class TransactionFutureCallback extends OnComplete<Object> {
         }
 
         Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
-            getTransactionType().ordinal(), proxy.getTransactionChainId()).toSerializable();
+            getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable();
 
         Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage);
 
-        createTxFuture.onComplete(this, getActorContext().getClientDispatcher());
+        createTxFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                onCreateTransactionComplete(failure, response);
+            }
+        }, getActorContext().getClientDispatcher());
     }
 
-    @Override
-    public void onComplete(Throwable failure, Object response) {
+    private void onCreateTransactionComplete(Throwable failure, Object response) {
         if(failure instanceof NoShardLeaderException) {
             // There's no leader for the shard yet - schedule and try again, unless we're out
             // of retries. Note: createTxTries is volatile as it may be written by different
@@ -188,12 +149,7 @@ final class TransactionFutureCallback extends OnComplete<Object> {
         createTransactionContext(failure, response);
     }
 
-    void createTransactionContext(Throwable failure, Object response) {
-        // Mainly checking for state violation here to perform a volatile read of "initialized" to
-        // ensure updates to operationLimter et al are visible to this thread (ie we're doing
-        // "piggy-back" synchronization here).
-        proxy.ensureInitializied();
-
+    private void createTransactionContext(Throwable failure, Object response) {
         // Create the TransactionContext from the response or failure. Store the new
         // TransactionContext locally until we've completed invoking the
         // TransactionOperations. This avoids thread timing issues which could cause
@@ -217,47 +173,33 @@ final class TransactionFutureCallback extends OnComplete<Object> {
             localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
         }
 
-        executeTxOperatonsOnComplete(localTransactionContext);
-    }
-
-    private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
-        while(true) {
-            // Access to txOperationsOnComplete and transactionContext must be protected and atomic
-            // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
-            // issues and ensure no TransactionOperation is missed and that they are processed
-            // in the order they occurred.
-
-            // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
-            // in case a TransactionOperation results in another transaction operation being
-            // queued (eg a put operation from a client read Future callback that is notified
-            // synchronously).
-            Collection<TransactionOperation> operationsBatch = null;
-            synchronized(txOperationsOnComplete) {
-                if(txOperationsOnComplete.isEmpty()) {
-                    // We're done invoking the TransactionOperations so we can now publish the
-                    // TransactionContext.
-                    transactionContext = localTransactionContext;
-                    break;
-                }
-
-                operationsBatch = new ArrayList<>(txOperationsOnComplete);
-                txOperationsOnComplete.clear();
-            }
-
-            // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
-            // A slight down-side is that we need to re-acquire the lock below but this should
-            // be negligible.
-            for(TransactionOperation oper: operationsBatch) {
-                oper.invoke(localTransactionContext);
-            }
-        }
+        transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
     }
 
     private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
         LOG.debug("Tx {} Received {}", getIdentifier(), reply);
 
-        return proxy.createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
+        return createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
                 reply.getTransactionPath(), reply.getVersion());
     }
 
-}
\ No newline at end of file
+    private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath,
+            short remoteTransactionVersion) {
+        // TxActor is always created where the leader of the shard is.
+        // Check if TxActor is created in the same node
+        boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath);
+        final TransactionContext ret;
+
+        if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
+            ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
+                getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+        } else {
+            ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(),
+                isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+        }
+
+        TransactionContextCleanup.track(this, ret);
+        return ret;
+    }
+}
+