From: Robert Varga Date: Tue, 19 May 2015 19:50:12 +0000 (+0200) Subject: Convert OperationCompleter to OperationLimiter X-Git-Tag: release/beryllium~481 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c7e1ddeaf842ebb696c8dd38c0ca14c925ee31a1 Convert OperationCompleter to OperationLimiter The completer and limiter functions are related to each other, so encapsulate them in a single object. Since a TransactionProxy cannot really do anything without touching the limiter, make sure we instantiate it in constructor, preventing some volatile reads in the fast path. Change-Id: I4cf31ef46c11676611a62db7a794f504712ce5af Signed-off-by: Robert Varga (cherry picked from commit b2af021ee27b2977961f0fec6f8bb1a4acbcdbd7) --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java index b50426a811..c90a3f6f6f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -178,7 +178,7 @@ abstract class AbstractTransactionContextFactory data) { incrementModificationCount(); getWriteDelegate().write(path, data); - completer.onComplete(null, null); + limiter.release(); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { incrementModificationCount(); getWriteDelegate().merge(path, data); - completer.onComplete(null, null); + limiter.release(); } @Override public void deleteData(YangInstanceIdentifier path) { incrementModificationCount(); getWriteDelegate().delete(path); - completer.onComplete(null, null); + limiter.release(); } @Override @@ -69,13 +69,13 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { @Override public void onSuccess(Optional> result) { proxyFuture.set(result); - completer.onComplete(null, null); + limiter.release(); } @Override public void onFailure(Throwable t) { proxyFuture.setException(t); - completer.onComplete(null, null); + limiter.release(); } }); } @@ -86,13 +86,13 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { @Override public void onSuccess(Boolean result) { proxyFuture.set(result); - completer.onComplete(null, null); + limiter.release(); } @Override public void onFailure(Throwable t) { proxyFuture.setException(t); - completer.onComplete(null, null); + limiter.release(); } }); } @@ -100,7 +100,7 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { private LocalThreePhaseCommitCohort ready() { logModificationCount(); LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready(); - completer.onComplete(null, null); + limiter.release(); return ready; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java index 4507168000..ff485cbab1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java @@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; -import java.util.concurrent.Semaphore; - import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException; @@ -26,9 +24,9 @@ final class NoOpTransactionContext extends AbstractTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class); private final Throwable failure; - private final Semaphore operationLimiter; + private final OperationLimiter operationLimiter; - public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter) { + public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, OperationLimiter operationLimiter) { super(identifier); this.failure = failure; this.operationLimiter = operationLimiter; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java deleted file mode 100644 index ec867dda0b..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -import akka.dispatch.OnComplete; -import com.google.common.base.Preconditions; -import java.util.concurrent.Semaphore; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; - -public final class OperationCompleter extends OnComplete { - private final Semaphore operationLimiter; - - OperationCompleter(Semaphore operationLimiter){ - this.operationLimiter = Preconditions.checkNotNull(operationLimiter); - } - - @Override - public void onComplete(Throwable throwable, Object message) { - if(message instanceof BatchedModificationsReply) { - this.operationLimiter.release(((BatchedModificationsReply)message).getNumBatched()); - } else { - this.operationLimiter.release(); - } - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java new file mode 100644 index 0000000000..b42230971b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.dispatch.OnComplete; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly + * into akka to release permits as futures complete. + */ +public class OperationLimiter extends OnComplete { + private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class); + private final TransactionIdentifier identifier; + private final long acquireTimeout; + private final Semaphore semaphore; + + OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) { + this.identifier = Preconditions.checkNotNull(identifier); + + Preconditions.checkArgument(acquireTimeoutSeconds >= 0); + this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds); + + Preconditions.checkArgument(maxPermits >= 0); + this.semaphore = new Semaphore(maxPermits); + } + + void acquire() { + acquire(1); + } + + private void acquire(final int acquirePermits) { + try { + if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) { + LOG.warn("Failed to acquire operation permit for transaction {}", identifier); + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", identifier, e); + } else { + LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier); + } + } + } + + void release() { + this.semaphore.release(); + } + + @Override + public void onComplete(final Throwable throwable, final Object message) { + if (message instanceof BatchedModificationsReply) { + this.semaphore.release(((BatchedModificationsReply)message).getNumBatched()); + } else { + this.semaphore.release(); + } + } + + public TransactionIdentifier getIdentifier() { + return identifier; + } + + @VisibleForTesting + Semaphore getSemaphore() { + return semaphore; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index 7429ca01df..6bf0f7fc9c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -46,19 +46,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext { private final boolean isTxActorLocal; private final short remoteTransactionVersion; - private final OperationCompleter operationCompleter; + private final OperationLimiter operationCompleter; private BatchedModifications batchedModifications; private int totalBatchedModificationsSent; protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, boolean isTxActorLocal, - short remoteTransactionVersion, OperationCompleter operationCompleter) { + short remoteTransactionVersion, OperationLimiter limiter) { super(identifier); this.actor = actor; this.actorContext = actorContext; this.isTxActorLocal = isTxActorLocal; this.remoteTransactionVersion = remoteTransactionVersion; - this.operationCompleter = operationCompleter; + this.operationCompleter = limiter; } private Future completeOperation(Future operationFuture){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 71d9dabf37..9cb062dc1c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; @@ -75,7 +74,7 @@ final class RemoteTransactionContextSupport { return parent.getActorContext(); } - private Semaphore getOperationLimiter() { + private OperationLimiter getOperationLimiter() { return parent.getLimiter(); } @@ -192,10 +191,10 @@ final class RemoteTransactionContextSupport { if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), - getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getCompleter()); + getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter()); } else { ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(), - isTxActorLocal, remoteTransactionVersion, parent.getCompleter()); + isTxActorLocal, remoteTransactionVersion, parent.getLimiter()); } if(parent.getType() == TransactionType.READ_ONLY) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index ea4e513942..26d7ff8b02 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -56,7 +56,7 @@ class TransactionContextWrapper { */ private void enqueueTransactionOperation(final TransactionOperation operation) { final boolean invokeOperation; - synchronized(queuedTxOperations) { + synchronized (queuedTxOperations) { if (transactionContext == null) { LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index e7a00042e4..5aafcfc88f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -55,10 +53,9 @@ public class TransactionProxy extends AbstractDOMStoreTransaction txContextAdapters = new HashMap<>(); private final AbstractTransactionContextFactory txContextFactory; + private final OperationLimiter limiter; private final TransactionType type; private TransactionState state = TransactionState.OPEN; - private volatile OperationCompleter operationCompleter; - private volatile Semaphore operationLimiter; @VisibleForTesting public TransactionProxy(final AbstractTransactionContextFactory txContextFactory, final TransactionType type) { @@ -67,6 +64,11 @@ public class TransactionProxy extends AbstractDOMStoreTransaction proxyFuture = SettableFuture.create(); TransactionContextWrapper contextAdapter = getContextAdapter(path); @@ -99,7 +101,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createSingleCommitCohort(final String shardName, final TransactionContextWrapper contextAdapter) { - throttleOperation(); + limiter.acquire(); LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName); @@ -306,7 +308,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createMultiCommitCohort( final Set> txContextAdapterEntries) { - throttleOperation(); + limiter.acquire(); final List> cohortFutures = new ArrayList<>(txContextAdapterEntries.size()); for (Entry e : txContextAdapterEntries) { LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); @@ -366,43 +368,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction normalizedNode = mock(NormalizedNode.class); localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode); verify(limiter).release(); verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); @@ -59,7 +58,7 @@ public class LocalTransactionContextTest { @Test public void testMerge(){ YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); - NormalizedNode normalizedNode = mock(NormalizedNode.class); + NormalizedNode normalizedNode = mock(NormalizedNode.class); localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode); verify(limiter).release(); verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); @@ -77,7 +76,7 @@ public class LocalTransactionContextTest { @Test public void testRead(){ YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); - NormalizedNode normalizedNode = mock(NormalizedNode.class); + NormalizedNode normalizedNode = mock(NormalizedNode.class); doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier); localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.>>create()); verify(limiter).release(); @@ -102,4 +101,4 @@ public class LocalTransactionContextTest { } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java deleted file mode 100644 index 00900d35fb..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -import static org.junit.Assert.assertEquals; -import java.util.concurrent.Semaphore; -import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; -import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; - -/** - * Unit tests for OperationCompleter. - * - * @author Thomas Pantelis - */ -public class OperationCompleterTest { - - @Test - public void testOnComplete() throws Exception { - int permits = 10; - Semaphore operationLimiter = new Semaphore(permits); - operationLimiter.acquire(permits); - int availablePermits = 0; - - OperationCompleter completer = new OperationCompleter(operationLimiter ); - - completer.onComplete(null, DataExistsReply.create(true)); - assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits()); - - completer.onComplete(null, DataExistsReply.create(true)); - assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits()); - - completer.onComplete(null, new IllegalArgumentException()); - assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits()); - - completer.onComplete(null, new BatchedModificationsReply(4)); - availablePermits += 4; - assertEquals("availablePermits", availablePermits, operationLimiter.availablePermits()); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java new file mode 100644 index 0000000000..40ce84b234 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import java.util.concurrent.Semaphore; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; +import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; + +/** + * Unit tests for OperationCompleter. + * + * @author Thomas Pantelis + */ +public class OperationLimiterTest { + + @Test + public void testOnComplete() throws Exception { + int permits = 10; + OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier("foo", 1), permits, 1); + Semaphore semaphore = limiter.getSemaphore(); + semaphore.acquire(permits); + int availablePermits = 0; + + limiter.onComplete(null, DataExistsReply.create(true)); + assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits()); + + limiter.onComplete(null, DataExistsReply.create(true)); + assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits()); + + limiter.onComplete(null, new IllegalArgumentException()); + assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits()); + + limiter.onComplete(null, new BatchedModificationsReply(4)); + availablePermits += 4; + assertEquals("availablePermits", availablePermits, semaphore.availablePermits()); + } +}