Bug 4823: Use tx commit timeout for BatchedModifications 98/34298/3
authorTom Pantelis <tpanteli@brocade.com>
Mon, 8 Feb 2016 21:50:23 +0000 (16:50 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 16 Feb 2016 09:49:03 +0000 (09:49 +0000)
When sending BatchedModifications messages to the shard we use the
general operation timeout which is 5 sec. We should instead use the
transaction commit timeout to be consistent with the other transaction
messages (ReadyLocalTransaction, CanCommitTransaction etc).

Change-Id: I26bead59c29ee198f677838c5adb3614e3795a04
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronTransactionProxyTest.java

index 3f821c7..29783a6 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
+import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
@@ -62,8 +63,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         return actorContext;
     }
 
-    protected Future<Object> executeOperationAsync(SerializableMessage msg) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable()));
+    protected Future<Object> executeOperationAsync(SerializableMessage msg, Timeout timeout) {
+        return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable(), timeout));
     }
 
     @Override
@@ -141,7 +142,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             batchedModifications.setReady(ready);
             batchedModifications.setDoCommitOnReady(doCommitOnReady);
             batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
-            sent = executeOperationAsync(batchedModifications);
+            sent = executeOperationAsync(batchedModifications, actorContext.getTransactionCommitOperationTimeout());
 
             if(ready) {
                 batchedModifications = null;
@@ -196,7 +197,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             }
         };
 
-        Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()));
+        Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()),
+                actorContext.getOperationTimeout());
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
index e502c0d..e45604b 100644 (file)
@@ -255,7 +255,7 @@ public abstract class AbstractTransactionProxyTest {
 
     protected void expectBatchedModifications(ActorRef actorRef, int count) {
         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
     }
 
     protected void expectBatchedModificationsReady(ActorRef actorRef) {
@@ -265,22 +265,22 @@ public abstract class AbstractTransactionProxyTest {
     protected void expectBatchedModificationsReady(ActorRef actorRef, boolean doCommitOnReady) {
         doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
             readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                    eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+                    eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
     }
 
     protected void expectBatchedModifications(int count) {
         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class));
+                any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
     }
 
     protected void expectIncompleteBatchedModifications() {
         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class));
+                any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
     }
 
     protected void expectFailedBatchedModifications(ActorRef actorRef) {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
     }
 
     protected void expectReadyLocalTransaction(ActorRef actorRef, boolean doCommitOnReady) {
@@ -387,7 +387,7 @@ public abstract class AbstractTransactionProxyTest {
         ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
                 ArgumentCaptor.forClass(BatchedModifications.class);
         verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
-                eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
+                eq(actorSelection(actorRef)), batchedModificationsCaptor.capture(), any(Timeout.class));
 
         List<BatchedModifications> batchedModifications = filterCaptured(
                 batchedModificationsCaptor, BatchedModifications.class);
index 66b8c0e..ed6a731 100644 (file)
@@ -128,7 +128,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
             Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
             doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
-                    eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
+                    eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
 
             DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
 
@@ -198,7 +198,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
             Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
             doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
-                    eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
+                    eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
 
             DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
 
index 8f30331..31cde96 100644 (file)
@@ -92,7 +92,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData());
+                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
@@ -102,7 +102,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData());
+                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
 
         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
@@ -116,7 +116,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqReadData());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -128,7 +128,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqReadData());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -188,7 +188,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         expectBatchedModifications(actorRef, 1);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData());
+                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -202,10 +202,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         InOrder inOrder = Mockito.inOrder(mockActorContext);
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData());
+                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
     }
 
     @Test(expected=IllegalStateException.class)
@@ -240,14 +240,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists());
+                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", false, exists);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists());
+                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
 
         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
@@ -269,7 +269,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -281,7 +281,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -297,7 +297,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         expectBatchedModifications(actorRef, 1);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists());
+                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -309,10 +309,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         InOrder inOrder = Mockito.inOrder(mockActorContext);
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists());
+                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
     }
 
     @Test(expected=IllegalStateException.class)
@@ -347,7 +347,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData());
+                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
 
         expectBatchedModificationsReady(actorRef);
 
@@ -443,7 +443,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData());
+                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
 
         expectBatchedModifications(actorRef, 1);
 
@@ -471,7 +471,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData());
+                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
 
         expectBatchedModificationsReady(actorRef, true);
 
@@ -501,7 +501,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData());
+                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
 
         expectBatchedModificationsReady(actorRef, true);
 
@@ -729,7 +729,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
+                executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class),
+                        any(Timeout.class));
 
         expectBatchedModificationsReady(actorRef2);
 
@@ -761,7 +762,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData());
+                eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -1164,7 +1165,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
                 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqReadData());
+                        any(ActorSelection.class), eqReadData(), any(Timeout.class));
 
                 transactionProxy.read(TestModel.TEST_PATH);
 
@@ -1240,7 +1241,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
             @Override
             public void run(TransactionProxy transactionProxy) {
                 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqDataExists());
+                        any(ActorSelection.class), eqDataExists(), any(Timeout.class));
 
                 transactionProxy.exists(TestModel.TEST_PATH);
 
@@ -1413,13 +1414,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
 
         doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(writePath2));
+                eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
 
         doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(mergePath2));
+                eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists());
+                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -1458,22 +1459,22 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         InOrder inOrder = Mockito.inOrder(mockActorContext);
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(writePath2));
+                eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(mergePath2));
+                eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
 
         inOrder.verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists());
+                eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
     }
 
     @Test
@@ -1545,6 +1546,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                         eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.builder().build()));
+                eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.builder().build()), any(Timeout.class));
     }
 }
index 46f55f1..0bf4f48 100644 (file)
@@ -140,7 +140,7 @@ public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
 
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqLegacySerializedReadData(TestModel.TEST_PATH));
+                eq(actorSelection(actorRef)), eqLegacySerializedReadData(TestModel.TEST_PATH), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
@@ -156,7 +156,7 @@ public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqLegacySerializedDataExists());
+                eq(actorSelection(actorRef)), eqLegacySerializedDataExists(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);