Bug 2260: Reduce overhead of unused TransactionProxy instances 59/15859/7
authorTom Pantelis <tpanteli@brocade.com>
Sun, 1 Mar 2015 02:21:10 +0000 (21:21 -0500)
committerMoiz Raja <moraja@cisco.com>
Mon, 9 Mar 2015 18:40:46 +0000 (18:40 +0000)
Moved constructon of the operationLimiter and operationCompleter to the
throtteOperation method lazily. Added a volatile initialized boolean for
the lazy check and to "piggy-back" synchronize the updates to
operationLimiter and operationCompleter as well.

In TransactionProxy#ready, if no operations were performed then the
txFutureCallbackMap will be empty so returned a static
NoOpDOMStoreThreePhaseCommitCohort in that case.

The only remaining overhead in the constructor is the
TransactionIdentifier instance. The Builder seemed extraneous here so I
removed it to reduce the overhead from 2 object allocations to 1.

Change-Id: I083d4176c880055938bbf1f21c00da3859d66af6
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index 58b37be2a2727babd9b0305e868d85d90c079052..0bc82af3358c4747901fab005db5c19cf2d41288 100644 (file)
@@ -18,9 +18,11 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -162,7 +164,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
      * PhantomReference.
      */
     private List<ActorSelection> remoteTransactionActors;
-    private AtomicBoolean remoteTransactionActorsMB;
+    private volatile AtomicBoolean remoteTransactionActorsMB;
 
     /**
      * Stores the create transaction results per shard.
@@ -175,8 +177,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
-    private final Semaphore operationLimiter;
-    private final OperationCompleter operationCompleter;
+
+    private volatile boolean initialized;
+    private Semaphore operationLimiter;
+    private OperationCompleter operationCompleter;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
         this(actorContext, transactionType, "");
@@ -197,25 +201,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             memberName = "UNKNOWN-MEMBER";
         }
 
-        this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
-            counter.getAndIncrement()).build();
-
-        if(transactionType == TransactionType.READ_ONLY) {
-            // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
-            // to close the remote Tx's when this instance is no longer in use and is garbage
-            // collected.
-
-            remoteTransactionActors = Lists.newArrayList();
-            remoteTransactionActorsMB = new AtomicBoolean();
-
-            TransactionProxyCleanupPhantomReference cleanup =
-                new TransactionProxyCleanupPhantomReference(this);
-            phantomReferenceCache.put(cleanup, cleanup);
-        }
-
-        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-        this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
-        this.operationCompleter = new OperationCompleter(operationLimiter);
+        this.identifier = new TransactionIdentifier(memberName, counter.getAndIncrement());
 
         LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
     }
@@ -303,6 +289,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     private void throttleOperation(int acquirePermits) {
+        if(!initialized) {
+            // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+            operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
+            operationCompleter = new OperationCompleter(operationLimiter);
+
+            // Make sure we write this last because it's volatile and will also publish the non-volatile writes
+            // above as well so they'll be visible to other threads.
+            initialized = true;
+        }
+
         try {
             if(!operationLimiter.tryAcquire(acquirePermits,
                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
@@ -377,13 +373,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        throttleOperation(txFutureCallbackMap.size());
-
         inReadyState = true;
 
         LOG.debug("Tx {} Readying {} transactions for commit", identifier,
                     txFutureCallbackMap.size());
 
+        if(txFutureCallbackMap.size() == 0) {
+            onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
+            return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
+        }
+
+        throttleOperation(txFutureCallbackMap.size());
+
         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
 
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
@@ -454,7 +455,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         txFutureCallbackMap.clear();
 
-        if(transactionType == TransactionType.READ_ONLY) {
+        if(remoteTransactionActorsMB != null) {
             remoteTransactionActors.clear();
             remoteTransactionActorsMB.set(true);
         }
@@ -627,6 +628,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             }
 
+            // Mainly checking for state violation here to perform a volatile read of "initialized" to
+            // ensure updates to operationLimter et al are visible to this thread (ie we're doing
+            // "piggy-back" synchronization here).
+            Preconditions.checkState(initialized, "Tx was not propertly initialized.");
+
             // Create the TransactionContext from the response or failure. Store the new
             // TransactionContext locally until we've completed invoking the
             // TransactionOperations. This avoids thread timing issues which could cause
@@ -695,6 +701,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
 
             if (transactionType == TransactionType.READ_ONLY) {
+                // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+                // to close the remote Tx's when this instance is no longer in use and is garbage
+                // collected.
+
+                if(remoteTransactionActorsMB == null) {
+                    remoteTransactionActors = Lists.newArrayList();
+                    remoteTransactionActorsMB = new AtomicBoolean();
+
+                    TransactionProxyCleanupPhantomReference cleanup =
+                            new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
+                    phantomReferenceCache.put(cleanup, cleanup);
+                }
+
                 // Add the actor to the remoteTransactionActors list for access by the
                 // cleanup PhantonReference.
                 remoteTransactionActors.add(transactionActor);
@@ -717,4 +736,36 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
     }
+
+    private static class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+        static NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
+
+        private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS =
+                com.google.common.util.concurrent.Futures.immediateFuture(null);
+        private static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS =
+                com.google.common.util.concurrent.Futures.immediateFuture(Boolean.TRUE);
+
+        private NoOpDOMStoreThreePhaseCommitCohort() {
+        }
+
+        @Override
+        public ListenableFuture<Boolean> canCommit() {
+            return IMMEDIATE_BOOLEAN_SUCCESS;
+        }
+
+        @Override
+        public ListenableFuture<Void> preCommit() {
+            return IMMEDIATE_VOID_SUCCESS;
+        }
+
+        @Override
+        public ListenableFuture<Void> abort() {
+            return IMMEDIATE_VOID_SUCCESS;
+        }
+
+        @Override
+        public ListenableFuture<Void> commit() {
+            return IMMEDIATE_VOID_SUCCESS;
+        }
+    }
 }
index ba2e27c69f96f55192030b5e622e660197d19847..32637a578e2d2af08c79f160bade7a9e2faf62aa 100644 (file)
@@ -11,19 +11,17 @@ package org.opendaylight.controller.cluster.datastore.identifiers;
 import com.google.common.base.Preconditions;
 
 public class TransactionIdentifier {
+    private static final String TX_SEPARATOR = "-txn-";
+
     private final String memberName;
     private final long counter;
-
+    private String stringRepresentation;
 
     public TransactionIdentifier(String memberName, long counter) {
         this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null");
         this.counter = counter;
     }
 
-    public static Builder builder(){
-        return new Builder();
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -52,29 +50,13 @@ public class TransactionIdentifier {
         return result;
     }
 
-    @Override public String toString() {
-        final StringBuilder sb =
-            new StringBuilder();
-        sb.append(memberName).append("-txn-").append(counter);
-        return sb.toString();
-    }
-
-    public static class Builder {
-        private String memberName;
-        private long counter;
-
-        public TransactionIdentifier build(){
-            return new TransactionIdentifier(memberName, counter);
-        }
-
-        public Builder memberName(String memberName){
-            this.memberName = memberName;
-            return this;
+    @Override
+    public String toString() {
+        if(stringRepresentation == null) {
+            stringRepresentation = new StringBuilder(memberName.length() + TX_SEPARATOR.length() + 10).
+                append(memberName).append(TX_SEPARATOR).append(counter).toString();
         }
 
-        public Builder counter(long counter){
-            this.counter = counter;
-            return this;
-        }
+        return stringRepresentation;
     }
 }
index 6573308c12100914badbedc5d0296b90e096a2b7..abfe7eae22a15b69fd4dc1c71df46befe5059e31 100644 (file)
@@ -1071,6 +1071,17 @@ public class TransactionProxyTest {
         verifyCohortFutures(proxy, IllegalArgumentException.class);
     }
 
+    @Test
+    public void testUnusedTransaction() throws Exception {
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertEquals("canCommit", true, ready.canCommit().get());
+        ready.preCommit().get();
+        ready.commit().get();
+    }
+
     @Test
     public void testGetIdentifier() {
         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);