BUG 3249 : Operation Limiter not release on completion of operation 98/20698/2
authorMoiz Raja <moraja@cisco.com>
Mon, 18 May 2015 21:12:08 +0000 (14:12 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 19 May 2015 09:57:42 +0000 (09:57 +0000)
The OperationLimiter uses up one permit and per operation done on a
transaction. When that operation is complete the permit must be
released. For Local transactions this was not happening. Due to this
when a local transaction has more operations than the maximum outstanding
operations we would block for the operation timeout period (5 seconds
by default) this can have all sorts of adverse side effects on
transactions.

This patch ensures that for local transactions those permits are freed
up when the operation is complete.

Change-Id: Id8ba99c6ea081c7e37da0b517860dca818a91218
Signed-off-by: Moiz Raja <moraja@cisco.com>
(cherry picked from commit 7deb48eedb9aec8230c65237c48679ecca6af919)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index 78e059c798bbacea4f2e6b50588ede4d464bd8a0..a8a076f98fcee0c73919d8187ddb64ec2b88718b 100644 (file)
@@ -200,6 +200,6 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
 
     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
-        return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()));
+        return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()), parent.getCompleter());
     }
 }
index 01a778f8e43157730938490de9cd4a16ec1c8c21..dd7c9194fde2cdd95b7f72be041c977ac7ad9739 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
@@ -26,59 +27,71 @@ import scala.concurrent.Future;
  */
 final class LocalTransactionContext extends AbstractTransactionContext {
     private final DOMStoreReadWriteTransaction delegate;
+    private final OperationCompleter completer;
 
-    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate) {
+    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate, OperationCompleter completer) {
         super(identifier);
-        this.delegate = delegate;
+        this.delegate = Preconditions.checkNotNull(delegate);
+        this.completer = Preconditions.checkNotNull(completer);
     }
 
     @Override
-    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+    public void writeData(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         delegate.write(path, data);
+        completer.onComplete(null, null);
     }
 
     @Override
-    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+    public void mergeData(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         delegate.merge(path, data);
+        completer.onComplete(null, null);
     }
 
     @Override
-    public void deleteData(YangInstanceIdentifier path) {
+    public void deleteData(final YangInstanceIdentifier path) {
         delegate.delete(path);
+        completer.onComplete(null, null);
     }
 
     @Override
-    public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+    public void readData(final YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+
         Futures.addCallback(delegate.read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
             @Override
             public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
                 proxyFuture.set(result);
+                completer.onComplete(null, null);
             }
 
             @Override
             public void onFailure(Throwable t) {
                 proxyFuture.setException(t);
+                completer.onComplete(null, null);
             }
         });
     }
 
     @Override
-    public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
+    public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
         Futures.addCallback(delegate.exists(path), new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
                 proxyFuture.set(result);
+                completer.onComplete(null, null);
             }
 
             @Override
             public void onFailure(Throwable t) {
                 proxyFuture.setException(t);
+                completer.onComplete(null, null);
             }
         });
     }
 
     private LocalThreePhaseCommitCohort ready() {
-        return (LocalThreePhaseCommitCohort) delegate.ready();
+        LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) delegate.ready();
+        completer.onComplete(null, null);
+        return ready;
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
new file mode 100644 (file)
index 0000000..7ca9f90
--- /dev/null
@@ -0,0 +1,93 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.Semaphore;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class LocalTransactionContextTest {
+
+    @Mock
+    Semaphore limiter;
+
+    @Mock
+    TransactionIdentifier identifier;
+
+    @Mock
+    DOMStoreReadWriteTransaction readWriteTransaction;
+
+    LocalTransactionContext localTransactionContext;
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+        localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter));
+    }
+
+    @Test
+    public void testWrite(){
+        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+        NormalizedNode normalizedNode = mock(NormalizedNode.class);
+        localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
+        verify(limiter).release();
+        verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
+    }
+
+    @Test
+    public void testMerge(){
+        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+        NormalizedNode normalizedNode = mock(NormalizedNode.class);
+        localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
+        verify(limiter).release();
+        verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
+    }
+
+    @Test
+    public void testDelete(){
+        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+        localTransactionContext.deleteData(yangInstanceIdentifier);
+        verify(limiter).release();
+        verify(readWriteTransaction).delete(yangInstanceIdentifier);
+    }
+
+
+    @Test
+    public void testRead(){
+        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+        NormalizedNode normalizedNode = mock(NormalizedNode.class);
+        doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
+        localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+        verify(limiter).release();
+        verify(readWriteTransaction).read(yangInstanceIdentifier);
+    }
+
+    @Test
+    public void testExists(){
+        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+        doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
+        localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
+        verify(limiter).release();
+        verify(readWriteTransaction).exists(yangInstanceIdentifier);
+    }
+
+    @Test
+    public void testReady(){
+        doReturn(mock(LocalThreePhaseCommitCohort.class)).when(readWriteTransaction).ready();
+        localTransactionContext.readyTransaction();
+        verify(limiter).release();
+        verify(readWriteTransaction).ready();
+    }
+
+
+}
\ No newline at end of file
index 6cf63157e16ff7f37033ee3142f4ca1c06e84df8..f19bace5b99ce21862a04455241cf913a34fc350 100644 (file)
@@ -59,6 +59,8 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Promise;
@@ -781,6 +783,11 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.<DataTree>absent());
     }
 
+    private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
+        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), dataTreeOptional);
+    }
+
+
     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
@@ -871,6 +878,73 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 expected, (end-start)), (end - start) <= expected);
     }
 
+    private void completeOperationLocal(TransactionProxyOperation operation, Optional<DataTree> dataTreeOptional){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTreeOptional))).
+                when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+        long start = System.nanoTime();
+
+        operation.run(transactionProxy);
+
+        long end = System.nanoTime();
+
+        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+                expected, (end-start)), (end - start) <= expected);
+    }
+
+    private Optional<DataTree> createDataTree(){
+        DataTree dataTree = mock(DataTree.class);
+        Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
+        DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
+        DataTreeModification dataTreeModification = mock(DataTreeModification.class);
+
+        doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
+        doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
+
+        return dataTreeOptional;
+    }
+
+    private Optional<DataTree> createDataTree(NormalizedNode readResponse){
+        DataTree dataTree = mock(DataTree.class);
+        Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
+        DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
+        DataTreeModification dataTreeModification = mock(DataTreeModification.class);
+
+        doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
+        doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
+        doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
+
+        return dataTreeOptional;
+    }
+
+
+    @Test
+    public void testWriteCompletionForLocalShard(){
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+            }
+        }, createDataTree());
+    }
+
     @Test
     public void testWriteThrottlingWhenShardFound(){
         dataStoreContextBuilder.shardBatchedModificationCount(1);
@@ -977,6 +1051,23 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     }
 
+    @Test
+    public void testMergeCompletionForLocalShard(){
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+            }
+        }, createDataTree());
+    }
+
+
     @Test
     public void testDeleteThrottlingWhenShardFound(){
 
@@ -1008,6 +1099,21 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         }, false);
     }
 
+    @Test
+    public void testDeleteCompletionForLocalShard(){
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        }, createDataTree());
+
+    }
+
     @Test
     public void testDeleteCompletion(){
         dataStoreContextBuilder.shardBatchedModificationCount(1);
@@ -1075,6 +1181,33 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     }
 
+    @Test
+    public void testReadCompletionForLocalShard(){
+        final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        }, createDataTree(nodeToRead));
+
+    }
+
+    @Test
+    public void testReadCompletionForLocalShardWhenExceptionOccurs(){
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        }, createDataTree());
+
+    }
+
     @Test
     public void testExistsThrottlingWhenShardFound(){
 
@@ -1124,6 +1257,32 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     }
 
+    @Test
+    public void testExistsCompletionForLocalShard(){
+        final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        }, createDataTree(nodeToRead));
+
+    }
+
+    @Test
+    public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        }, createDataTree());
+
+    }
     @Test
     public void testReadyThrottling(){