From: Tom Pantelis Date: Mon, 8 Feb 2016 21:50:23 +0000 (-0500) Subject: Bug 4823: Use tx commit timeout for BatchedModifications X-Git-Tag: release/boron~364 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=705a9efb2e158cbaf852077fe57167fddc646311;ds=sidebyside Bug 4823: Use tx commit timeout for BatchedModifications When sending BatchedModifications messages to the shard we use the general operation timeout which is 5 sec. We should instead use the transaction commit timeout to be consistent with the other transaction messages (ReadyLocalTransaction, CanCommitTransaction etc). Change-Id: I26bead59c29ee198f677838c5adb3614e3795a04 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index 3f821c74fb..29783a6e7d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; +import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; @@ -62,8 +63,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext { return actorContext; } - protected Future executeOperationAsync(SerializableMessage msg) { - return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable())); + protected Future executeOperationAsync(SerializableMessage msg, Timeout timeout) { + return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable(), timeout)); } @Override @@ -141,7 +142,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { batchedModifications.setReady(ready); batchedModifications.setDoCommitOnReady(doCommitOnReady); batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent); - sent = executeOperationAsync(batchedModifications); + sent = executeOperationAsync(batchedModifications, actorContext.getTransactionCommitOperationTimeout()); if(ready) { batchedModifications = null; @@ -196,7 +197,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } }; - Future future = executeOperationAsync(readCmd.asVersion(getTransactionVersion())); + Future future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()), + actorContext.getOperationTimeout()); future.onComplete(onComplete, actorContext.getClientDispatcher()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index e502c0d428..e45604b5fe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -255,7 +255,7 @@ public abstract class AbstractTransactionProxyTest { protected void expectBatchedModifications(ActorRef actorRef, int count) { doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); } protected void expectBatchedModificationsReady(ActorRef actorRef) { @@ -265,22 +265,22 @@ public abstract class AbstractTransactionProxyTest { protected void expectBatchedModificationsReady(ActorRef actorRef, boolean doCommitOnReady) { doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) : readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); } protected void expectBatchedModifications(int count) { doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); + any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class)); } protected void expectIncompleteBatchedModifications() { doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); + any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class)); } protected void expectFailedBatchedModifications(ActorRef actorRef) { doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); } protected void expectReadyLocalTransaction(ActorRef actorRef, boolean doCommitOnReady) { @@ -387,7 +387,7 @@ public abstract class AbstractTransactionProxyTest { ArgumentCaptor batchedModificationsCaptor = ArgumentCaptor.forClass(BatchedModifications.class); verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync( - eq(actorSelection(actorRef)), batchedModificationsCaptor.capture()); + eq(actorSelection(actorRef)), batchedModificationsCaptor.capture(), any(Timeout.class)); List batchedModifications = filterCaptured( batchedModificationsCaptor, BatchedModifications.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index 66b8c0ec8d..ed6a731162 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -128,7 +128,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { Promise batchedReplyPromise1 = akka.dispatch.Futures.promise(); doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(txActorRef1)), isA(BatchedModifications.class)); + eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class)); DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction(); @@ -198,7 +198,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { Promise readyReplyPromise1 = akka.dispatch.Futures.promise(); doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(txActorRef1)), isA(BatchedModifications.class)); + eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class)); DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 8f30331550..31cde96db6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -92,7 +92,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); Optional> readOptional = transactionProxy.read( TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -102,7 +102,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -116,7 +116,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqReadData()); + executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); @@ -128,7 +128,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqReadData()); + executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); @@ -188,7 +188,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(actorRef, 1); doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); @@ -202,10 +202,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { InOrder inOrder = Mockito.inOrder(mockActorContext); inOrder.verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); inOrder.verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); } @Test(expected=IllegalStateException.class) @@ -240,14 +240,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class)); Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); assertEquals("Exists response", false, exists); doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class)); exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); @@ -269,7 +269,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists()); + executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); @@ -281,7 +281,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists()); + executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); @@ -297,7 +297,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(actorRef, 1); doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); @@ -309,10 +309,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { InOrder inOrder = Mockito.inOrder(mockActorContext); inOrder.verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); inOrder.verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class)); } @Test(expected=IllegalStateException.class) @@ -347,7 +347,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class)); doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); expectBatchedModificationsReady(actorRef); @@ -443,7 +443,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); expectBatchedModifications(actorRef, 1); @@ -471,7 +471,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); expectBatchedModificationsReady(actorRef, true); @@ -501,7 +501,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); expectBatchedModificationsReady(actorRef, true); @@ -729,7 +729,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk"); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class)); + executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class), + any(Timeout.class)); expectBatchedModificationsReady(actorRef2); @@ -761,7 +762,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); @@ -1164,7 +1165,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqReadData()); + any(ActorSelection.class), eqReadData(), any(Timeout.class)); transactionProxy.read(TestModel.TEST_PATH); @@ -1240,7 +1241,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Override public void run(TransactionProxy transactionProxy) { doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqDataExists()); + any(ActorSelection.class), eqDataExists(), any(Timeout.class)); transactionProxy.exists(TestModel.TEST_PATH); @@ -1413,13 +1414,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH; doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData(writePath2)); + eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class)); doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData(mergePath2)); + eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class)); doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); @@ -1458,22 +1459,22 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { InOrder inOrder = Mockito.inOrder(mockActorContext); inOrder.verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); inOrder.verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData(writePath2)); + eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class)); inOrder.verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); inOrder.verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData(mergePath2)); + eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class)); inOrder.verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class)); inOrder.verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class)); } @Test @@ -1545,6 +1546,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class)); doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.builder().build())); + eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.builder().build()), any(Timeout.class)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronTransactionProxyTest.java index 46f55f1760..0bf4f48cea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreBoronTransactionProxyTest.java @@ -140,7 +140,7 @@ public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqLegacySerializedReadData(TestModel.TEST_PATH)); + eq(actorSelection(actorRef)), eqLegacySerializedReadData(TestModel.TEST_PATH), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); @@ -156,7 +156,7 @@ public class PreBoronTransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqLegacySerializedDataExists()); + eq(actorSelection(actorRef)), eqLegacySerializedDataExists(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);