Remove recorded modification Futures from TransactionContext 77/17777/3
authorTom Pantelis <tpanteli@brocade.com>
Fri, 3 Apr 2015 09:39:56 +0000 (05:39 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 9 Apr 2015 11:50:46 +0000 (07:50 -0400)
This removes the recorded batched modification message Futures
from TransactionContext to avoid the overhead of waiting for
the Futures to complete on ready and transforming them. The
Futures were recorded to ensure all modification messages
were successfully processed before committing the transaction
but we can do that by including the count of modification
messages sent in the ready message for the transaction actor
to verify (later patch).

Change-Id: I8d6ad80cac3e8e13fde48fbf5c57a476cca003dd
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index d94e1c6..81605d8 100644 (file)
@@ -7,40 +7,17 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import scala.concurrent.Future;
 
 abstract class AbstractTransactionContext implements TransactionContext {
 
-    private final List<Future<Object>> recordedOperationFutures = new ArrayList<>();
     private final TransactionIdentifier identifier;
 
     protected AbstractTransactionContext(TransactionIdentifier identifier) {
         this.identifier = identifier;
     }
 
-    @Override
-    public final void copyRecordedOperationFutures(Collection<Future<Object>> target) {
-        target.addAll(recordedOperationFutures);
-    }
-
     protected final TransactionIdentifier getIdentifier() {
         return identifier;
     }
-
-    protected final Collection<Future<Object>> copyRecordedOperationFutures() {
-        return ImmutableList.copyOf(recordedOperationFutures);
-    }
-
-    protected final int recordedOperationCount() {
-        return recordedOperationFutures.size();
-    }
-
-    protected final void recordOperationFuture(Future<Object> future) {
-        recordedOperationFutures.add(future);
-    }
-}
+}
\ No newline at end of file
index a5a7494..bc6e5f2 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.Collection;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.Future;
@@ -33,6 +32,4 @@ interface TransactionContext {
     void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture);
 
     void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
-
-    void copyRecordedOperationFutures(Collection<Future<Object>> target);
 }
index c61682d..b990088 100644 (file)
@@ -11,9 +11,7 @@ import akka.actor.ActorSelection;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
@@ -93,51 +91,30 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-            getIdentifier(), recordedOperationCount());
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the remaining batched modifications if any.
 
-        sendAndRecordBatchedModifications();
+        sendBatchedModifications();
 
         // Send the ReadyTransaction message to the Tx actor.
 
         Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
-        return combineRecordedOperationsFutures(readyReplyFuture);
+        return transformReadyReply(readyReplyFuture);
     }
 
-    protected Future<ActorSelection> combineRecordedOperationsFutures(final Future<Object> withLastReplyFuture) {
-        // Combine all the previously recorded put/merge/delete operation reply Futures and the
-        // ReadyTransactionReply Future into one Future. If any one fails then the combined
-        // Future will fail. We need all prior operations and the ready operation to succeed
-        // in order to attempt commit.
+    protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
+        // Transform the last reply Future into a Future that returns the cohort actor path from
+        // the last reply message. That's the end result of the ready operation.
 
-        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationCount() + 1);
-        copyRecordedOperationFutures(futureList);
-        futureList.add(withLastReplyFuture);
-
-        Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
-                actorContext.getClientDispatcher());
-
-        // Transform the combined Future into a Future that returns the cohort actor path from
-        // the ReadyTransactionReply. That's the end result of the ready operation.
-
-        return combinedFutures.transform(new Mapper<Iterable<Object>, ActorSelection>() {
+        return readyReplyFuture.transform(new Mapper<Object, ActorSelection>() {
             @Override
-            public ActorSelection checkedApply(Iterable<Object> notUsed) {
-                LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
-                    getIdentifier());
-
-                // At this point all the Futures succeeded and we need to extract the cohort
-                // actor path from the ReadyTransactionReply. For the recorded operations, they
-                // don't return any data so we're only interested that they completed
-                // successfully. We could be paranoid and verify the correct reply types but
-                // that really should never happen so it's not worth the overhead of
-                // de-serializing each reply.
-
-                // Note the Future get call here won't block as it's complete.
-                Object serializedReadyReply = withLastReplyFuture.value().get().get();
+            public ActorSelection checkedApply(Object serializedReadyReply) {
+                LOG.debug("Tx {} readyTransaction", getIdentifier());
+
+                // At this point the rwady operation succeeded and we need to extract the cohort
+                // actor path from the reply.
                 if (serializedReadyReply instanceof ReadyTransactionReply) {
                     return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
                 } else if(serializedReadyReply instanceof BatchedModificationsReply) {
@@ -169,14 +146,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
         if(batchedModifications.getModifications().size() >=
                 actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
-            sendAndRecordBatchedModifications();
-        }
-    }
-
-    private void sendAndRecordBatchedModifications() {
-        Future<Object> sentFuture = sendBatchedModifications();
-        if(sentFuture != null) {
-            recordOperationFuture(sentFuture);
+            sendBatchedModifications();
         }
     }
 
@@ -232,7 +202,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
-        sendAndRecordBatchedModifications();
+        sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
@@ -274,7 +244,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
-        sendAndRecordBatchedModifications();
+        sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
index 5f9cc83..0fd37b9 100644 (file)
@@ -158,19 +158,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return new TransactionIdentifier(memberName, counter.getAndIncrement());
     }
 
-    @VisibleForTesting
-    List<Future<Object>> getRecordedOperationFutures() {
-        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
-        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            if (transactionContext != null) {
-                transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
-            }
-        }
-
-        return recordedOperationFutures;
-    }
-
     @VisibleForTesting
     boolean hasTransactionContext() {
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
index e131354..b9fe90d 100644 (file)
@@ -32,13 +32,12 @@ public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-            getIdentifier(), recordedOperationCount());
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the remaining batched modifications if any.
 
         Future<Object> lastModificationsFuture = sendBatchedModifications(true);
 
-        return combineRecordedOperationsFutures(lastModificationsFuture);
+        return transformReadyReply(lastModificationsFuture);
     }
 }
index c345033..9509b06 100644 (file)
@@ -45,32 +45,28 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
-        recordOperationFuture(executeOperationAsync(
-                new DeleteData(path, getRemoteTransactionVersion())));
+        executeOperationAsync(new DeleteData(path, getRemoteTransactionVersion()));
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordOperationFuture(executeOperationAsync(
-                new MergeData(path, data, getRemoteTransactionVersion())));
+        executeOperationAsync(new MergeData(path, data, getRemoteTransactionVersion()));
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordOperationFuture(executeOperationAsync(
-                new WriteData(path, data, getRemoteTransactionVersion())));
+        executeOperationAsync(new WriteData(path, data, getRemoteTransactionVersion()));
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-            getIdentifier(), recordedOperationCount());
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the ReadyTransaction message to the Tx actor.
 
         Future<Object> lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
-        return combineRecordedOperationsFutures(lastReplyFuture);
+        return transformReadyReply(lastReplyFuture);
     }
 
     @Override
index 29a5b09..b95eaf6 100644 (file)
@@ -3,7 +3,6 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -41,7 +40,6 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
@@ -61,10 +59,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.Promise;
-import scala.concurrent.duration.Duration;
 
 @SuppressWarnings("resource")
 public class TransactionProxyTest extends AbstractTransactionProxyTest {
@@ -313,29 +308,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         transactionProxy.exists(TestModel.TEST_PATH);
     }
 
-    private void verifyRecordingOperationFutures(List<Future<Object>> futures,
-            Class<?>... expResultTypes) throws Exception {
-        assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
-
-        int i = 0;
-        for( Future<Object> future: futures) {
-            assertNotNull("Recording operation Future is null", future);
-
-            Class<?> expResultType = expResultTypes[i++];
-            if(Throwable.class.isAssignableFrom(expResultType)) {
-                try {
-                    Await.result(future, Duration.create(5, TimeUnit.SECONDS));
-                    fail("Expected exception from recording operation Future");
-                } catch(Exception e) {
-                    // Expected
-                }
-            } else {
-                assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
-                             Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
-            }
-        }
-    }
-
     @Test
     public void testWrite() throws Exception {
         dataStoreContextBuilder.shardBatchedModificationCount(1);
@@ -405,9 +377,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         transactionProxy.ready();
 
         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
     }
 
     @Test(expected=IllegalStateException.class)
@@ -479,9 +448,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
-
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
@@ -511,8 +477,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
-
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
@@ -544,9 +508,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
-
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
@@ -561,33 +522,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
     }
 
-    @Test
-    public void testReadyWithRecordingOperationFailure() throws Exception {
-        dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectFailedBatchedModifications(actorRef);
-
-        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
-
-        verifyCohortFutures(proxy, TestException.class);
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
-    }
-
     @Test
     public void testReadyWithReplyFailure() throws Exception {
         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
@@ -1245,14 +1179,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
         verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
                 new DeleteModification(deletePath2));
-
-        if(optimizedWriteOnly) {
-            verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                    BatchedModificationsReply.class, BatchedModificationsReply.class);
-        } else {
-            verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                    BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
-        }
     }
 
     @Test
@@ -1357,9 +1283,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
     }
 
     @Test

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.