From: Tom Pantelis Date: Sun, 8 Mar 2015 02:16:37 +0000 (-0500) Subject: Modify ChainedTransactionProxy to override sending of FindPrimaryShard X-Git-Tag: release/lithium~430 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=691c47ae72532db04f9b2c33cb8a0cef642e5a17 Modify ChainedTransactionProxy to override sending of FindPrimaryShard Previously, ChainedTransactionProxy overrode the sendCreateTransaction method from TransactionProxy to intercept the sending of the CreateTransaction message in order to asynchronously wait for the previous transaction's ready Futures to complete before sending the message. To facilitate write-only transaction optimizations which will not create a separate transaction actor, we need to intercept the FindPrimaryShard message instead. Thus a new overridden method, sendFindPrimaryShardAsync, was added the same as sendCreateTransaction except it sends the FindPrimaryShard message.. Change-Id: I5d0a3de0b9530a538e2425147fad8ace823763f3 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index ee3a5cc825..58ac1d8b82 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -175,45 +175,47 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { /** * This method is overridden to ensure the previous Tx's ready operations complete - * before we create the next shard Tx in the chain to avoid creation failures if the + * before we initiate the next Tx in the chain to avoid creation failures if the * previous Tx's ready operations haven't completed yet. */ @Override - protected Future sendCreateTransaction(final ActorSelection shard, - final Object serializedCreateMessage) { - + protected Future sendFindPrimaryShardAsync(final String shardName) { // Check if there are any previous ready Futures, otherwise let the super class handle it. if(previousReadyFutures.isEmpty()) { - return super.sendCreateTransaction(shard, serializedCreateMessage); + return super.sendFindPrimaryShardAsync(shardName); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}", + previousReadyFutures.size(), getIdentifier(), getTransactionChainId()); } // Combine the ready Futures into 1. Future> combinedFutures = akka.dispatch.Futures.sequence( - previousReadyFutures, getActorContext().getActorSystem().dispatcher()); + previousReadyFutures, getActorContext().getClientDispatcher()); // Add a callback for completion of the combined Futures. - final Promise createTxPromise = akka.dispatch.Futures.promise(); + final Promise returnPromise = akka.dispatch.Futures.promise(); OnComplete> onComplete = new OnComplete>() { @Override public void onComplete(Throwable failure, Iterable notUsed) { if(failure != null) { // A Ready Future failed so fail the returned Promise. - createTxPromise.failure(failure); + returnPromise.failure(failure); } else { - LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}", + LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}", getIdentifier(), getTransactionChainId()); - // Send the CreateTx message and use the resulting Future to complete the + // Send the FindPrimaryShard message and use the resulting Future to complete the // returned Promise. - createTxPromise.completeWith(getActorContext().executeOperationAsync(shard, - serializedCreateMessage)); + returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName)); } } }; - combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher()); - return createTxPromise.future(); + return returnPromise.future(); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index be7169859d..c1f9c78e69 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -153,8 +153,8 @@ public class TransactionContextImpl extends AbstractTransactionContext { } else { // Throwing an exception here will fail the Future. - throw new IllegalArgumentException(String.format("Invalid reply type %s", - serializedReadyReply.getClass())); + throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", + identifier, serializedReadyReply.getClass())); } } }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index e5119cf299..64b9086c25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -426,18 +426,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { protected void onTransactionReady(List> cohortFutures) { } - /** - * Method called to send a CreateTransaction message to a shard. - * - * @param shard the shard actor to send to - * @param serializedCreateMessage the serialized message to send - * @return the response Future - */ - protected Future sendCreateTransaction(ActorSelection shard, - Object serializedCreateMessage) { - return actorContext.executeOperationAsync(shard, serializedCreateMessage); - } - @Override public Object getIdentifier() { return this.identifier; @@ -466,14 +454,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return ShardStrategyFactory.getStrategy(path).findShard(path); } + protected Future sendFindPrimaryShardAsync(String shardName) { + return actorContext.findPrimaryShardAsync(shardName); + } + private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { String shardName = shardNameFromIdentifier(path); TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); if(txFutureCallback == null) { - Future findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName); + Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - final TransactionFutureCallback newTxFutureCallback = - new TransactionFutureCallback(shardName); + final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName); txFutureCallback = newTxFutureCallback; txFutureCallbackMap.put(shardName, txFutureCallback); @@ -599,10 +590,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - Future createTxFuture = sendCreateTransaction(primaryShard, - new CreateTransaction(identifier.toString(), - TransactionProxy.this.transactionType.ordinal(), - getTransactionChainId()).toSerializable()); + Object serializedCreateMessage = new CreateTransaction(identifier.toString(), + TransactionProxy.this.transactionType.ordinal(), + getTransactionChainId()).toSerializable(); + + Future createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage); createTxFuture.onComplete(this, actorContext.getClientDispatcher()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index 60625a05fd..4896b059c7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; @@ -65,6 +66,8 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -75,6 +78,8 @@ import scala.concurrent.duration.Duration; * @author Thomas Pantelis */ public abstract class AbstractTransactionProxyTest { + protected final Logger log = LoggerFactory.getLogger(getClass()); + private static ActorSystem system; private final Configuration configuration = new MockConfiguration(); @@ -276,6 +281,8 @@ public abstract class AbstractTransactionProxyTest { protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + log.info("Created mock shard actor {}", actorRef); + doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); @@ -291,13 +298,26 @@ public abstract class AbstractTransactionProxyTest { protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) { - ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem); + ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem); - doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), - eqCreateTransaction(memberName, type)); + return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion, + memberName, shardActorRef); + } - return actorRef; + protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, + TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) { + + ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + log.info("Created mock shard Tx actor {}", txActorRef); + + doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection( + txActorRef.path().toString()); + + doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(prefix, type)); + + return txActorRef; } protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { @@ -375,12 +395,12 @@ public abstract class AbstractTransactionProxyTest { ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); assertEquals("Cohort actor path", expReply, actual); } else { - // Expecting exception. try { Await.result(future, Duration.create(5, TimeUnit.SECONDS)); fail("Expected exception from ready operation Future"); } catch(Exception e) { - // Expected + assertTrue(String.format("Expected exception type %s. Actual %s", + expReply, e.getClass()), ((Class)expReply).isInstance(e)); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index 88ab0dd292..4f00ed5f4b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -10,47 +10,44 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; +import akka.actor.ActorRef; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import scala.concurrent.Promise; -public class TransactionChainProxyTest extends AbstractActorTest{ - ActorContext actorContext = null; - SchemaContext schemaContext = mock(SchemaContext.class); - - @Mock - ActorContext mockActorContext; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - actorContext = new MockActorContext(getSystem()); - actorContext.setSchemaContext(schemaContext); - - doReturn(schemaContext).when(mockActorContext).getSchemaContext(); - doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext(); - } +public class TransactionChainProxyTest extends AbstractTransactionProxyTest { @SuppressWarnings("resource") @Test public void testNewReadOnlyTransaction() throws Exception { - DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadOnlyTransaction(); + DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction(); Assert.assertTrue(dst instanceof DOMStoreReadTransaction); } @@ -58,7 +55,7 @@ public class TransactionChainProxyTest extends AbstractActorTest{ @SuppressWarnings("resource") @Test public void testNewReadWriteTransaction() throws Exception { - DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadWriteTransaction(); + DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction(); Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction); } @@ -66,18 +63,16 @@ public class TransactionChainProxyTest extends AbstractActorTest{ @SuppressWarnings("resource") @Test public void testNewWriteOnlyTransaction() throws Exception { - DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newWriteOnlyTransaction(); + DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction(); Assert.assertTrue(dst instanceof DOMStoreWriteTransaction); } @Test public void testClose() throws Exception { - ActorContext context = mock(ActorContext.class); + new TransactionChainProxy(mockActorContext).close(); - new TransactionChainProxy(context).close(); - - verify(context, times(1)).broadcast(anyObject()); + verify(mockActorContext, times(1)).broadcast(anyObject()); } @Test @@ -115,4 +110,93 @@ public class TransactionChainProxyTest extends AbstractActorTest{ verify(mockActorContext, times(0)).acquireTxCreationPermit(); } + + /** + * Tests 2 successive chained read-write transactions and verifies the second transaction isn't + * initiated until the first one completes its read future. + */ + @Test + public void testChainedReadWriteTransactions() throws Exception { + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + expectBatchedModifications(txActorRef1, 1); + + Promise readyReplyPromise1 = akka.dispatch.Futures.promise(); + doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction(); + + NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx1.write(TestModel.TEST_PATH, writeNode1); + + writeTx1.ready(); + + verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1)); + + String tx2MemberName = "tx2MemberName"; + doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName(); + ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem()); + ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, + DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2); + + expectBatchedModifications(txActorRef2, 1); + + final NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME); + + final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction(); + + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch write2Complete = new CountDownLatch(1); + new Thread() { + @Override + public void run() { + try { + writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2); + } catch (Exception e) { + caughtEx.set(e); + } finally { + write2Complete.countDown(); + } + } + }.start(); + + assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS)); + + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + try { + verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())), + eqCreateTransaction(tx2MemberName, READ_WRITE)); + } catch (AssertionError e) { + fail("Tx 2 should not have initiated until the Tx 1's ready future completed"); + } + + readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get()); + + verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())), + eqCreateTransaction(tx2MemberName, READ_WRITE)); + } + + @Test(expected=IllegalStateException.class) + public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + expectBatchedModifications(actorRef, 1); + + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction(); + + NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx1.write(TestModel.TEST_PATH, writeNode1); + + NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME); + + txChainProxy.newWriteOnlyTransaction(); + } }