Bug 2260: Reduce overhead of unused TransactionProxy instances
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 58b37be2a2727babd9b0305e868d85d90c079052..0bc82af3358c4747901fab005db5c19cf2d41288 100644 (file)
@@ -18,9 +18,11 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -162,7 +164,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
      * PhantomReference.
      */
     private List<ActorSelection> remoteTransactionActors;
-    private AtomicBoolean remoteTransactionActorsMB;
+    private volatile AtomicBoolean remoteTransactionActorsMB;
 
     /**
      * Stores the create transaction results per shard.
@@ -175,8 +177,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
-    private final Semaphore operationLimiter;
-    private final OperationCompleter operationCompleter;
+
+    private volatile boolean initialized;
+    private Semaphore operationLimiter;
+    private OperationCompleter operationCompleter;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
         this(actorContext, transactionType, "");
@@ -197,25 +201,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             memberName = "UNKNOWN-MEMBER";
         }
 
-        this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
-            counter.getAndIncrement()).build();
-
-        if(transactionType == TransactionType.READ_ONLY) {
-            // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
-            // to close the remote Tx's when this instance is no longer in use and is garbage
-            // collected.
-
-            remoteTransactionActors = Lists.newArrayList();
-            remoteTransactionActorsMB = new AtomicBoolean();
-
-            TransactionProxyCleanupPhantomReference cleanup =
-                new TransactionProxyCleanupPhantomReference(this);
-            phantomReferenceCache.put(cleanup, cleanup);
-        }
-
-        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-        this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
-        this.operationCompleter = new OperationCompleter(operationLimiter);
+        this.identifier = new TransactionIdentifier(memberName, counter.getAndIncrement());
 
         LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
     }
@@ -303,6 +289,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     private void throttleOperation(int acquirePermits) {
+        if(!initialized) {
+            // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+            operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
+            operationCompleter = new OperationCompleter(operationLimiter);
+
+            // Make sure we write this last because it's volatile and will also publish the non-volatile writes
+            // above as well so they'll be visible to other threads.
+            initialized = true;
+        }
+
         try {
             if(!operationLimiter.tryAcquire(acquirePermits,
                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
@@ -377,13 +373,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        throttleOperation(txFutureCallbackMap.size());
-
         inReadyState = true;
 
         LOG.debug("Tx {} Readying {} transactions for commit", identifier,
                     txFutureCallbackMap.size());
 
+        if(txFutureCallbackMap.size() == 0) {
+            onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
+            return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
+        }
+
+        throttleOperation(txFutureCallbackMap.size());
+
         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
 
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
@@ -454,7 +455,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         txFutureCallbackMap.clear();
 
-        if(transactionType == TransactionType.READ_ONLY) {
+        if(remoteTransactionActorsMB != null) {
             remoteTransactionActors.clear();
             remoteTransactionActorsMB.set(true);
         }
@@ -627,6 +628,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             }
 
+            // Mainly checking for state violation here to perform a volatile read of "initialized" to
+            // ensure updates to operationLimter et al are visible to this thread (ie we're doing
+            // "piggy-back" synchronization here).
+            Preconditions.checkState(initialized, "Tx was not propertly initialized.");
+
             // Create the TransactionContext from the response or failure. Store the new
             // TransactionContext locally until we've completed invoking the
             // TransactionOperations. This avoids thread timing issues which could cause
@@ -695,6 +701,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
 
             if (transactionType == TransactionType.READ_ONLY) {
+                // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+                // to close the remote Tx's when this instance is no longer in use and is garbage
+                // collected.
+
+                if(remoteTransactionActorsMB == null) {
+                    remoteTransactionActors = Lists.newArrayList();
+                    remoteTransactionActorsMB = new AtomicBoolean();
+
+                    TransactionProxyCleanupPhantomReference cleanup =
+                            new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
+                    phantomReferenceCache.put(cleanup, cleanup);
+                }
+
                 // Add the actor to the remoteTransactionActors list for access by the
                 // cleanup PhantonReference.
                 remoteTransactionActors.add(transactionActor);
@@ -717,4 +736,36 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
     }
+
+    private static class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+        static NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
+
+        private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS =
+                com.google.common.util.concurrent.Futures.immediateFuture(null);
+        private static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS =
+                com.google.common.util.concurrent.Futures.immediateFuture(Boolean.TRUE);
+
+        private NoOpDOMStoreThreePhaseCommitCohort() {
+        }
+
+        @Override
+        public ListenableFuture<Boolean> canCommit() {
+            return IMMEDIATE_BOOLEAN_SUCCESS;
+        }
+
+        @Override
+        public ListenableFuture<Void> preCommit() {
+            return IMMEDIATE_VOID_SUCCESS;
+        }
+
+        @Override
+        public ListenableFuture<Void> abort() {
+            return IMMEDIATE_VOID_SUCCESS;
+        }
+
+        @Override
+        public ListenableFuture<Void> commit() {
+            return IMMEDIATE_VOID_SUCCESS;
+        }
+    }
 }