Modify ChainedTransactionProxy to override sending of FindPrimaryShard 59/16159/3
authorTom Pantelis <tpanteli@brocade.com>
Sun, 8 Mar 2015 02:16:37 +0000 (21:16 -0500)
committerMoiz Raja <moraja@cisco.com>
Tue, 10 Mar 2015 18:55:17 +0000 (18:55 +0000)
Previously, ChainedTransactionProxy overrode the sendCreateTransaction
method from TransactionProxy to intercept the sending of the
CreateTransaction message in order to asynchronously wait for the
previous
transaction's ready Futures to complete before sending the message.

To facilitate write-only transaction optimizations which will not create
a separate transaction actor, we need to intercept the FindPrimaryShard
message instead. Thus a new overridden method,
sendFindPrimaryShardAsync, was added the same as sendCreateTransaction
except it sends the FindPrimaryShard message..

Change-Id: I5d0a3de0b9530a538e2425147fad8ace823763f3
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java

index ee3a5cc82573d6e415a5bbcd080277673e4cb051..58ac1d8b8265bc50fb7d38dea1dd9c1b916211fc 100644 (file)
@@ -175,45 +175,47 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
 
         /**
          * This method is overridden to ensure the previous Tx's ready operations complete
-         * before we create the next shard Tx in the chain to avoid creation failures if the
+         * before we initiate the next Tx in the chain to avoid creation failures if the
          * previous Tx's ready operations haven't completed yet.
          */
         @Override
-        protected Future<Object> sendCreateTransaction(final ActorSelection shard,
-                final Object serializedCreateMessage) {
-
+        protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
             // Check if there are any previous ready Futures, otherwise let the super class handle it.
             if(previousReadyFutures.isEmpty()) {
-                return super.sendCreateTransaction(shard, serializedCreateMessage);
+                return super.sendFindPrimaryShardAsync(shardName);
+            }
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
+                        previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
             }
 
             // Combine the ready Futures into 1.
             Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
-                    previousReadyFutures, getActorContext().getActorSystem().dispatcher());
+                    previousReadyFutures, getActorContext().getClientDispatcher());
 
             // Add a callback for completion of the combined Futures.
-            final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
+            final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
             OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
                 @Override
                 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
                     if(failure != null) {
                         // A Ready Future failed so fail the returned Promise.
-                        createTxPromise.failure(failure);
+                        returnPromise.failure(failure);
                     } else {
-                        LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}",
+                        LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
                                 getIdentifier(), getTransactionChainId());
 
-                        // Send the CreateTx message and use the resulting Future to complete the
+                        // Send the FindPrimaryShard message and use the resulting Future to complete the
                         // returned Promise.
-                        createTxPromise.completeWith(getActorContext().executeOperationAsync(shard,
-                                serializedCreateMessage));
+                        returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
                     }
                 }
             };
 
-            combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher());
+            combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
 
-            return createTxPromise.future();
+            return returnPromise.future();
         }
     }
 }
index be7169859db56be2851028b2d538c9ce29210c9f..c1f9c78e69ec683586147e01605ab7168f786e1b 100644 (file)
@@ -153,8 +153,8 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
                 } else {
                     // Throwing an exception here will fail the Future.
-                    throw new IllegalArgumentException(String.format("Invalid reply type %s",
-                            serializedReadyReply.getClass()));
+                    throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+                            identifier, serializedReadyReply.getClass()));
                 }
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
index e5119cf299c4d5e8b10974c25e5f9f02cc12c7f4..64b9086c250c16f759a417003d0cefc9839f688b 100644 (file)
@@ -426,18 +426,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
     }
 
-    /**
-     * Method called to send a CreateTransaction message to a shard.
-     *
-     * @param shard the shard actor to send to
-     * @param serializedCreateMessage the serialized message to send
-     * @return the response Future
-     */
-    protected Future<Object> sendCreateTransaction(ActorSelection shard,
-            Object serializedCreateMessage) {
-        return actorContext.executeOperationAsync(shard, serializedCreateMessage);
-    }
-
     @Override
     public Object getIdentifier() {
         return this.identifier;
@@ -466,14 +454,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
+    protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+        return actorContext.findPrimaryShardAsync(shardName);
+    }
+
     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
         String shardName = shardNameFromIdentifier(path);
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
-            Future<ActorSelection> findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName);
+            Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
 
-            final TransactionFutureCallback newTxFutureCallback =
-                    new TransactionFutureCallback(shardName);
+            final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
 
             txFutureCallback = newTxFutureCallback;
             txFutureCallbackMap.put(shardName, txFutureCallback);
@@ -599,10 +590,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
-                    new CreateTransaction(identifier.toString(),
-                            TransactionProxy.this.transactionType.ordinal(),
-                            getTransactionChainId()).toSerializable());
+            Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
+                    TransactionProxy.this.transactionType.ordinal(),
+                    getTransactionChainId()).toSerializable();
+
+            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
 
             createTxFuture.onComplete(this, actorContext.getClientDispatcher());
         }
index 60625a05fd1e2796136fb146301b69b252700379..4896b059c794284996019cc696e25224cf821a4d 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
@@ -65,6 +66,8 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -75,6 +78,8 @@ import scala.concurrent.duration.Duration;
  * @author Thomas Pantelis
  */
 public abstract class AbstractTransactionProxyTest {
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
     private static ActorSystem system;
 
     private final Configuration configuration = new MockConfiguration();
@@ -276,6 +281,8 @@ public abstract class AbstractTransactionProxyTest {
 
     protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+        log.info("Created mock shard actor {}", actorRef);
+
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
 
@@ -291,13 +298,26 @@ public abstract class AbstractTransactionProxyTest {
 
     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
             TransactionType type, int transactionVersion) {
-        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
+        ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
 
-        doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
-                executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
-                        eqCreateTransaction(memberName, type));
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
+                memberName, shardActorRef);
+    }
 
-        return actorRef;
+    protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) {
+
+        ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+        log.info("Created mock shard Tx actor {}", txActorRef);
+
+        doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection(
+                txActorRef.path().toString());
+
+        doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(prefix, type));
+
+        return txActorRef;
     }
 
     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
@@ -375,12 +395,12 @@ public abstract class AbstractTransactionProxyTest {
                     ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
                     assertEquals("Cohort actor path", expReply, actual);
                 } else {
-                    // Expecting exception.
                     try {
                         Await.result(future, Duration.create(5, TimeUnit.SECONDS));
                         fail("Expected exception from ready operation Future");
                     } catch(Exception e) {
-                        // Expected
+                        assertTrue(String.format("Expected exception type %s. Actual %s",
+                                expReply, e.getClass()), ((Class<?>)expReply).isInstance(e));
                     }
                 }
             }
index 88ab0dd292b4894f0eac4c8ae782452d9d696471..4f00ed5f4bcfe4e88908d9bf2ca25417c3658e18 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import akka.actor.ActorRef;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Promise;
 
-public class TransactionChainProxyTest extends AbstractActorTest{
-    ActorContext actorContext = null;
-    SchemaContext schemaContext = mock(SchemaContext.class);
-
-    @Mock
-    ActorContext mockActorContext;
-
-    @Before
-    public void setUp() {
-        MockitoAnnotations.initMocks(this);
-
-        actorContext = new MockActorContext(getSystem());
-        actorContext.setSchemaContext(schemaContext);
-
-        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
-        doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext();
-    }
+public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
     @SuppressWarnings("resource")
     @Test
     public void testNewReadOnlyTransaction() throws Exception {
 
-     DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadOnlyTransaction();
+     DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction();
          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
 
     }
@@ -58,7 +55,7 @@ public class TransactionChainProxyTest extends AbstractActorTest{
     @SuppressWarnings("resource")
     @Test
     public void testNewReadWriteTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadWriteTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction();
         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
 
     }
@@ -66,18 +63,16 @@ public class TransactionChainProxyTest extends AbstractActorTest{
     @SuppressWarnings("resource")
     @Test
     public void testNewWriteOnlyTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newWriteOnlyTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction();
         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
 
     }
 
     @Test
     public void testClose() throws Exception {
-        ActorContext context = mock(ActorContext.class);
+        new TransactionChainProxy(mockActorContext).close();
 
-        new TransactionChainProxy(context).close();
-
-        verify(context, times(1)).broadcast(anyObject());
+        verify(mockActorContext, times(1)).broadcast(anyObject());
     }
 
     @Test
@@ -115,4 +110,93 @@ public class TransactionChainProxyTest extends AbstractActorTest{
 
         verify(mockActorContext, times(0)).acquireTxCreationPermit();
     }
+
+    /**
+     * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
+     * initiated until the first one completes its read future.
+     */
+    @Test
+    public void testChainedReadWriteTransactions() throws Exception {
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        expectBatchedModifications(txActorRef1, 1);
+
+        Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
+        doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+        DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
+
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+        writeTx1.ready();
+
+        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1));
+
+        String tx2MemberName = "tx2MemberName";
+        doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
+        ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+        ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
+                DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
+
+        expectBatchedModifications(txActorRef2, 1);
+
+        final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
+
+        final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+        final CountDownLatch write2Complete = new CountDownLatch(1);
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
+                } catch (Exception e) {
+                    caughtEx.set(e);
+                } finally {
+                    write2Complete.countDown();
+                }
+            }
+        }.start();
+
+        assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
+
+        if(caughtEx.get() != null) {
+            throw caughtEx.get();
+        }
+
+        try {
+            verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
+                    eqCreateTransaction(tx2MemberName, READ_WRITE));
+        } catch (AssertionError e) {
+            fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
+        }
+
+        readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
+
+        verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
+                eqCreateTransaction(tx2MemberName, READ_WRITE));
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+        expectBatchedModifications(actorRef, 1);
+
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
+
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+        NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        txChainProxy.newWriteOnlyTransaction();
+    }
 }