From: Tom Pantelis Date: Sun, 1 Mar 2015 02:21:10 +0000 (-0500) Subject: Bug 2260: Reduce overhead of unused TransactionProxy instances X-Git-Tag: release/lithium~436^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=3bd5e35d7d22058d92c1441b6cdca6dd19ea6168 Bug 2260: Reduce overhead of unused TransactionProxy instances Moved constructon of the operationLimiter and operationCompleter to the throtteOperation method lazily. Added a volatile initialized boolean for the lazy check and to "piggy-back" synchronize the updates to operationLimiter and operationCompleter as well. In TransactionProxy#ready, if no operations were performed then the txFutureCallbackMap will be empty so returned a static NoOpDOMStoreThreePhaseCommitCohort in that case. The only remaining overhead in the constructor is the TransactionIdentifier instance. The Builder seemed extraneous here so I removed it to reduce the overhead from 2 object allocations to 1. Change-Id: I083d4176c880055938bbf1f21c00da3859d66af6 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 58b37be2a2..0bc82af335 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -18,9 +18,11 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -162,7 +164,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * PhantomReference. */ private List remoteTransactionActors; - private AtomicBoolean remoteTransactionActorsMB; + private volatile AtomicBoolean remoteTransactionActorsMB; /** * Stores the create transaction results per shard. @@ -175,8 +177,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final String transactionChainId; private final SchemaContext schemaContext; private boolean inReadyState; - private final Semaphore operationLimiter; - private final OperationCompleter operationCompleter; + + private volatile boolean initialized; + private Semaphore operationLimiter; + private OperationCompleter operationCompleter; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { this(actorContext, transactionType, ""); @@ -197,25 +201,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { memberName = "UNKNOWN-MEMBER"; } - this.identifier = TransactionIdentifier.builder().memberName(memberName).counter( - counter.getAndIncrement()).build(); - - if(transactionType == TransactionType.READ_ONLY) { - // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference - // to close the remote Tx's when this instance is no longer in use and is garbage - // collected. - - remoteTransactionActors = Lists.newArrayList(); - remoteTransactionActorsMB = new AtomicBoolean(); - - TransactionProxyCleanupPhantomReference cleanup = - new TransactionProxyCleanupPhantomReference(this); - phantomReferenceCache.put(cleanup, cleanup); - } - - // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem - this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); - this.operationCompleter = new OperationCompleter(operationLimiter); + this.identifier = new TransactionIdentifier(memberName, counter.getAndIncrement()); LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId); } @@ -303,6 +289,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } private void throttleOperation(int acquirePermits) { + if(!initialized) { + // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem + operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); + operationCompleter = new OperationCompleter(operationLimiter); + + // Make sure we write this last because it's volatile and will also publish the non-volatile writes + // above as well so they'll be visible to other threads. + initialized = true; + } + try { if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ @@ -377,13 +373,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { checkModificationState(); - throttleOperation(txFutureCallbackMap.size()); - inReadyState = true; LOG.debug("Tx {} Readying {} transactions for commit", identifier, txFutureCallbackMap.size()); + if(txFutureCallbackMap.size() == 0) { + onTransactionReady(Collections.>emptyList()); + return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + } + + throttleOperation(txFutureCallbackMap.size()); + List> cohortFutures = Lists.newArrayList(); for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { @@ -454,7 +455,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { txFutureCallbackMap.clear(); - if(transactionType == TransactionType.READ_ONLY) { + if(remoteTransactionActorsMB != null) { remoteTransactionActors.clear(); remoteTransactionActorsMB.set(true); } @@ -627,6 +628,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } + // Mainly checking for state violation here to perform a volatile read of "initialized" to + // ensure updates to operationLimter et al are visible to this thread (ie we're doing + // "piggy-back" synchronization here). + Preconditions.checkState(initialized, "Tx was not propertly initialized."); + // Create the TransactionContext from the response or failure. Store the new // TransactionContext locally until we've completed invoking the // TransactionOperations. This avoids thread timing issues which could cause @@ -695,6 +701,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ActorSelection transactionActor = actorContext.actorSelection(transactionPath); if (transactionType == TransactionType.READ_ONLY) { + // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference + // to close the remote Tx's when this instance is no longer in use and is garbage + // collected. + + if(remoteTransactionActorsMB == null) { + remoteTransactionActors = Lists.newArrayList(); + remoteTransactionActorsMB = new AtomicBoolean(); + + TransactionProxyCleanupPhantomReference cleanup = + new TransactionProxyCleanupPhantomReference(TransactionProxy.this); + phantomReferenceCache.put(cleanup, cleanup); + } + // Add the actor to the remoteTransactionActors list for access by the // cleanup PhantonReference. remoteTransactionActors.add(transactionActor); @@ -717,4 +736,36 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } } + + private static class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort { + static NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort(); + + private static final ListenableFuture IMMEDIATE_VOID_SUCCESS = + com.google.common.util.concurrent.Futures.immediateFuture(null); + private static final ListenableFuture IMMEDIATE_BOOLEAN_SUCCESS = + com.google.common.util.concurrent.Futures.immediateFuture(Boolean.TRUE); + + private NoOpDOMStoreThreePhaseCommitCohort() { + } + + @Override + public ListenableFuture canCommit() { + return IMMEDIATE_BOOLEAN_SUCCESS; + } + + @Override + public ListenableFuture preCommit() { + return IMMEDIATE_VOID_SUCCESS; + } + + @Override + public ListenableFuture abort() { + return IMMEDIATE_VOID_SUCCESS; + } + + @Override + public ListenableFuture commit() { + return IMMEDIATE_VOID_SUCCESS; + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java index ba2e27c69f..32637a578e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java @@ -11,19 +11,17 @@ package org.opendaylight.controller.cluster.datastore.identifiers; import com.google.common.base.Preconditions; public class TransactionIdentifier { + private static final String TX_SEPARATOR = "-txn-"; + private final String memberName; private final long counter; - + private String stringRepresentation; public TransactionIdentifier(String memberName, long counter) { this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null"); this.counter = counter; } - public static Builder builder(){ - return new Builder(); - } - @Override public boolean equals(Object o) { if (this == o) { @@ -52,29 +50,13 @@ public class TransactionIdentifier { return result; } - @Override public String toString() { - final StringBuilder sb = - new StringBuilder(); - sb.append(memberName).append("-txn-").append(counter); - return sb.toString(); - } - - public static class Builder { - private String memberName; - private long counter; - - public TransactionIdentifier build(){ - return new TransactionIdentifier(memberName, counter); - } - - public Builder memberName(String memberName){ - this.memberName = memberName; - return this; + @Override + public String toString() { + if(stringRepresentation == null) { + stringRepresentation = new StringBuilder(memberName.length() + TX_SEPARATOR.length() + 10). + append(memberName).append(TX_SEPARATOR).append(counter).toString(); } - public Builder counter(long counter){ - this.counter = counter; - return this; - } + return stringRepresentation; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 6573308c12..abfe7eae22 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1071,6 +1071,17 @@ public class TransactionProxyTest { verifyCohortFutures(proxy, IllegalArgumentException.class); } + @Test + public void testUnusedTransaction() throws Exception { + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertEquals("canCommit", true, ready.canCommit().get()); + ready.preCommit().get(); + ready.commit().get(); + } + @Test public void testGetIdentifier() { setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);