From 95538aa1baf80a90f03e4ae6d8268c9db34b3bfa Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Mon, 18 May 2015 14:12:08 -0700 Subject: [PATCH] BUG 3249 : Operation Limiter not release on completion of operation The OperationLimiter uses up one permit and per operation done on a transaction. When that operation is complete the permit must be released. For Local transactions this was not happening. Due to this when a local transaction has more operations than the maximum outstanding operations we would block for the operation timeout period (5 seconds by default) this can have all sorts of adverse side effects on transactions. This patch ensures that for local transactions those permits are freed up when the operation is complete. Change-Id: Id8ba99c6ea081c7e37da0b517860dca818a91218 Signed-off-by: Moiz Raja (cherry picked from commit 7deb48eedb9aec8230c65237c48679ecca6af919) --- .../AbstractTransactionContextFactory.java | 2 +- .../datastore/LocalTransactionContext.java | 29 +++- .../LocalTransactionContextTest.java | 93 ++++++++++ .../datastore/TransactionProxyTest.java | 159 ++++++++++++++++++ 4 files changed, 274 insertions(+), 9 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java index 78e059c798..a8a076f98f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -200,6 +200,6 @@ abstract class AbstractTransactionContextFactory void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection> cohortFutures); private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) { - return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier())); + return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()), parent.getCompleter()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java index 01a778f8e4..dd7c9194fd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; @@ -26,59 +27,71 @@ import scala.concurrent.Future; */ final class LocalTransactionContext extends AbstractTransactionContext { private final DOMStoreReadWriteTransaction delegate; + private final OperationCompleter completer; - LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate) { + LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate, OperationCompleter completer) { super(identifier); - this.delegate = delegate; + this.delegate = Preconditions.checkNotNull(delegate); + this.completer = Preconditions.checkNotNull(completer); } @Override - public void writeData(YangInstanceIdentifier path, NormalizedNode data) { + public void writeData(final YangInstanceIdentifier path, final NormalizedNode data) { delegate.write(path, data); + completer.onComplete(null, null); } @Override - public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { + public void mergeData(final YangInstanceIdentifier path, final NormalizedNode data) { delegate.merge(path, data); + completer.onComplete(null, null); } @Override - public void deleteData(YangInstanceIdentifier path) { + public void deleteData(final YangInstanceIdentifier path) { delegate.delete(path); + completer.onComplete(null, null); } @Override - public void readData(YangInstanceIdentifier path, final SettableFuture>> proxyFuture) { + public void readData(final YangInstanceIdentifier path, final SettableFuture>> proxyFuture) { + Futures.addCallback(delegate.read(path), new FutureCallback>>() { @Override public void onSuccess(Optional> result) { proxyFuture.set(result); + completer.onComplete(null, null); } @Override public void onFailure(Throwable t) { proxyFuture.setException(t); + completer.onComplete(null, null); } }); } @Override - public void dataExists(YangInstanceIdentifier path, final SettableFuture proxyFuture) { + public void dataExists(final YangInstanceIdentifier path, final SettableFuture proxyFuture) { Futures.addCallback(delegate.exists(path), new FutureCallback() { @Override public void onSuccess(Boolean result) { proxyFuture.set(result); + completer.onComplete(null, null); } @Override public void onFailure(Throwable t) { proxyFuture.setException(t); + completer.onComplete(null, null); } }); } private LocalThreePhaseCommitCohort ready() { - return (LocalThreePhaseCommitCohort) delegate.ready(); + LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) delegate.ready(); + completer.onComplete(null, null); + return ready; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java new file mode 100644 index 0000000000..7ca9f90a2c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java @@ -0,0 +1,93 @@ +package org.opendaylight.controller.cluster.datastore; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import java.util.concurrent.Semaphore; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public class LocalTransactionContextTest { + + @Mock + Semaphore limiter; + + @Mock + TransactionIdentifier identifier; + + @Mock + DOMStoreReadWriteTransaction readWriteTransaction; + + LocalTransactionContext localTransactionContext; + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter)); + } + + @Test + public void testWrite(){ + YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); + NormalizedNode normalizedNode = mock(NormalizedNode.class); + localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode); + verify(limiter).release(); + verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); + } + + @Test + public void testMerge(){ + YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); + NormalizedNode normalizedNode = mock(NormalizedNode.class); + localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode); + verify(limiter).release(); + verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); + } + + @Test + public void testDelete(){ + YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); + localTransactionContext.deleteData(yangInstanceIdentifier); + verify(limiter).release(); + verify(readWriteTransaction).delete(yangInstanceIdentifier); + } + + + @Test + public void testRead(){ + YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); + NormalizedNode normalizedNode = mock(NormalizedNode.class); + doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier); + localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.>>create()); + verify(limiter).release(); + verify(readWriteTransaction).read(yangInstanceIdentifier); + } + + @Test + public void testExists(){ + YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); + doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier); + localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture. create()); + verify(limiter).release(); + verify(readWriteTransaction).exists(yangInstanceIdentifier); + } + + @Test + public void testReady(){ + doReturn(mock(LocalThreePhaseCommitCohort.class)).when(readWriteTransaction).ready(); + localTransactionContext.readyTransaction(); + verify(limiter).release(); + verify(readWriteTransaction).ready(); + } + + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 6cf63157e1..f19bace5b9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -59,6 +59,8 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Promise; @@ -781,6 +783,11 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.absent()); } + private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional dataTreeOptional){ + return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), dataTreeOptional); + } + + private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){ ActorSystem actorSystem = getSystem(); ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); @@ -871,6 +878,73 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expected, (end-start)), (end - start) <= expected); } + private void completeOperationLocal(TransactionProxyOperation operation, Optional dataTreeOptional){ + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTreeOptional))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); + + long start = System.nanoTime(); + + operation.run(transactionProxy); + + long end = System.nanoTime(); + + long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()); + Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", + expected, (end-start)), (end - start) <= expected); + } + + private Optional createDataTree(){ + DataTree dataTree = mock(DataTree.class); + Optional dataTreeOptional = Optional.of(dataTree); + DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class); + DataTreeModification dataTreeModification = mock(DataTreeModification.class); + + doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot(); + doReturn(dataTreeModification).when(dataTreeSnapshot).newModification(); + + return dataTreeOptional; + } + + private Optional createDataTree(NormalizedNode readResponse){ + DataTree dataTree = mock(DataTree.class); + Optional dataTreeOptional = Optional.of(dataTree); + DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class); + DataTreeModification dataTreeModification = mock(DataTreeModification.class); + + doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot(); + doReturn(dataTreeModification).when(dataTreeSnapshot).newModification(); + doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class)); + + return dataTreeOptional; + } + + + @Test + public void testWriteCompletionForLocalShard(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); + completeOperationLocal(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + } + }, createDataTree()); + } + @Test public void testWriteThrottlingWhenShardFound(){ dataStoreContextBuilder.shardBatchedModificationCount(1); @@ -977,6 +1051,23 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } + @Test + public void testMergeCompletionForLocalShard(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); + completeOperationLocal(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + } + }, createDataTree()); + } + + @Test public void testDeleteThrottlingWhenShardFound(){ @@ -1008,6 +1099,21 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { }, false); } + @Test + public void testDeleteCompletionForLocalShard(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); + completeOperationLocal(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + + transactionProxy.delete(TestModel.TEST_PATH); + + transactionProxy.delete(TestModel.TEST_PATH); + } + }, createDataTree()); + + } + @Test public void testDeleteCompletion(){ dataStoreContextBuilder.shardBatchedModificationCount(1); @@ -1075,6 +1181,33 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } + @Test + public void testReadCompletionForLocalShard(){ + final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + completeOperationLocal(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.read(TestModel.TEST_PATH); + } + }, createDataTree(nodeToRead)); + + } + + @Test + public void testReadCompletionForLocalShardWhenExceptionOccurs(){ + completeOperationLocal(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.read(TestModel.TEST_PATH); + } + }, createDataTree()); + + } + @Test public void testExistsThrottlingWhenShardFound(){ @@ -1124,6 +1257,32 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } + @Test + public void testExistsCompletionForLocalShard(){ + final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + completeOperationLocal(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + transactionProxy.exists(TestModel.TEST_PATH); + + transactionProxy.exists(TestModel.TEST_PATH); + } + }, createDataTree(nodeToRead)); + + } + + @Test + public void testExistsCompletionForLocalShardWhenExceptionOccurs(){ + completeOperationLocal(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + transactionProxy.exists(TestModel.TEST_PATH); + + transactionProxy.exists(TestModel.TEST_PATH); + } + }, createDataTree()); + + } @Test public void testReadyThrottling(){ -- 2.36.6