Merge "Remove recorded modification Futures from TransactionContext"
authorMoiz Raja <moraja@cisco.com>
Fri, 10 Apr 2015 12:10:23 +0000 (12:10 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 10 Apr 2015 12:10:24 +0000 (12:10 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index d94e1c691e704051a81f74c2ba3ec135e1da002e..81605d8c8fe261a026187bd9245031b966f9a01a 100644 (file)
@@ -7,40 +7,17 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import scala.concurrent.Future;
 
 abstract class AbstractTransactionContext implements TransactionContext {
 
-    private final List<Future<Object>> recordedOperationFutures = new ArrayList<>();
     private final TransactionIdentifier identifier;
 
     protected AbstractTransactionContext(TransactionIdentifier identifier) {
         this.identifier = identifier;
     }
 
-    @Override
-    public final void copyRecordedOperationFutures(Collection<Future<Object>> target) {
-        target.addAll(recordedOperationFutures);
-    }
-
     protected final TransactionIdentifier getIdentifier() {
         return identifier;
     }
-
-    protected final Collection<Future<Object>> copyRecordedOperationFutures() {
-        return ImmutableList.copyOf(recordedOperationFutures);
-    }
-
-    protected final int recordedOperationCount() {
-        return recordedOperationFutures.size();
-    }
-
-    protected final void recordOperationFuture(Future<Object> future) {
-        recordedOperationFutures.add(future);
-    }
-}
+}
\ No newline at end of file
index a5a7494e1a0930d6dab535b920198a1d4773f8a5..bc6e5f229fe04aacffd8e719c138ba394b27bbd8 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.Collection;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.Future;
@@ -33,6 +32,4 @@ interface TransactionContext {
     void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture);
 
     void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
-
-    void copyRecordedOperationFutures(Collection<Future<Object>> target);
 }
index c61682d8efe98cf1649bebc03b381b6afeeb1d76..b9900889b1125e8ac229f89af9b4f64551209b8b 100644 (file)
@@ -11,9 +11,7 @@ import akka.actor.ActorSelection;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
@@ -93,51 +91,30 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-            getIdentifier(), recordedOperationCount());
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the remaining batched modifications if any.
 
-        sendAndRecordBatchedModifications();
+        sendBatchedModifications();
 
         // Send the ReadyTransaction message to the Tx actor.
 
         Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
-        return combineRecordedOperationsFutures(readyReplyFuture);
+        return transformReadyReply(readyReplyFuture);
     }
 
-    protected Future<ActorSelection> combineRecordedOperationsFutures(final Future<Object> withLastReplyFuture) {
-        // Combine all the previously recorded put/merge/delete operation reply Futures and the
-        // ReadyTransactionReply Future into one Future. If any one fails then the combined
-        // Future will fail. We need all prior operations and the ready operation to succeed
-        // in order to attempt commit.
+    protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
+        // Transform the last reply Future into a Future that returns the cohort actor path from
+        // the last reply message. That's the end result of the ready operation.
 
-        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationCount() + 1);
-        copyRecordedOperationFutures(futureList);
-        futureList.add(withLastReplyFuture);
-
-        Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
-                actorContext.getClientDispatcher());
-
-        // Transform the combined Future into a Future that returns the cohort actor path from
-        // the ReadyTransactionReply. That's the end result of the ready operation.
-
-        return combinedFutures.transform(new Mapper<Iterable<Object>, ActorSelection>() {
+        return readyReplyFuture.transform(new Mapper<Object, ActorSelection>() {
             @Override
-            public ActorSelection checkedApply(Iterable<Object> notUsed) {
-                LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
-                    getIdentifier());
-
-                // At this point all the Futures succeeded and we need to extract the cohort
-                // actor path from the ReadyTransactionReply. For the recorded operations, they
-                // don't return any data so we're only interested that they completed
-                // successfully. We could be paranoid and verify the correct reply types but
-                // that really should never happen so it's not worth the overhead of
-                // de-serializing each reply.
-
-                // Note the Future get call here won't block as it's complete.
-                Object serializedReadyReply = withLastReplyFuture.value().get().get();
+            public ActorSelection checkedApply(Object serializedReadyReply) {
+                LOG.debug("Tx {} readyTransaction", getIdentifier());
+
+                // At this point the rwady operation succeeded and we need to extract the cohort
+                // actor path from the reply.
                 if (serializedReadyReply instanceof ReadyTransactionReply) {
                     return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
                 } else if(serializedReadyReply instanceof BatchedModificationsReply) {
@@ -169,14 +146,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
         if(batchedModifications.getModifications().size() >=
                 actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
-            sendAndRecordBatchedModifications();
-        }
-    }
-
-    private void sendAndRecordBatchedModifications() {
-        Future<Object> sentFuture = sendBatchedModifications();
-        if(sentFuture != null) {
-            recordOperationFuture(sentFuture);
+            sendBatchedModifications();
         }
     }
 
@@ -232,7 +202,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
-        sendAndRecordBatchedModifications();
+        sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
@@ -274,7 +244,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
-        sendAndRecordBatchedModifications();
+        sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
index 5f9cc83618b760705a73a02d1260df5a77ecda03..0fd37b9ecef937083f34cc40ce011c5e694a84ec 100644 (file)
@@ -158,19 +158,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return new TransactionIdentifier(memberName, counter.getAndIncrement());
     }
 
-    @VisibleForTesting
-    List<Future<Object>> getRecordedOperationFutures() {
-        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
-        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            if (transactionContext != null) {
-                transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
-            }
-        }
-
-        return recordedOperationFutures;
-    }
-
     @VisibleForTesting
     boolean hasTransactionContext() {
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
index e1313540c44868e7f8d81dc4f0a857411bf20a15..b9fe90dc1308aca5beb99007baa89b5200b97f20 100644 (file)
@@ -32,13 +32,12 @@ public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-            getIdentifier(), recordedOperationCount());
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the remaining batched modifications if any.
 
         Future<Object> lastModificationsFuture = sendBatchedModifications(true);
 
-        return combineRecordedOperationsFutures(lastModificationsFuture);
+        return transformReadyReply(lastModificationsFuture);
     }
 }
index c3450333a46447d50aa16f6021fc2d48592a768b..9509b06ba7725c8a518223c9c146bd60bf6a4545 100644 (file)
@@ -45,32 +45,28 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
-        recordOperationFuture(executeOperationAsync(
-                new DeleteData(path, getRemoteTransactionVersion())));
+        executeOperationAsync(new DeleteData(path, getRemoteTransactionVersion()));
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordOperationFuture(executeOperationAsync(
-                new MergeData(path, data, getRemoteTransactionVersion())));
+        executeOperationAsync(new MergeData(path, data, getRemoteTransactionVersion()));
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordOperationFuture(executeOperationAsync(
-                new WriteData(path, data, getRemoteTransactionVersion())));
+        executeOperationAsync(new WriteData(path, data, getRemoteTransactionVersion()));
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-            getIdentifier(), recordedOperationCount());
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the ReadyTransaction message to the Tx actor.
 
         Future<Object> lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
-        return combineRecordedOperationsFutures(lastReplyFuture);
+        return transformReadyReply(lastReplyFuture);
     }
 
     @Override
index 29a5b09c5c8e33fcb8892054564d4264dd2a711d..b95eaf64d75d2ab83bd5cc2ae1c94f7e2f50e2c4 100644 (file)
@@ -3,7 +3,6 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -41,7 +40,6 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
@@ -61,10 +59,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.Promise;
-import scala.concurrent.duration.Duration;
 
 @SuppressWarnings("resource")
 public class TransactionProxyTest extends AbstractTransactionProxyTest {
@@ -313,29 +308,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         transactionProxy.exists(TestModel.TEST_PATH);
     }
 
-    private void verifyRecordingOperationFutures(List<Future<Object>> futures,
-            Class<?>... expResultTypes) throws Exception {
-        assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
-
-        int i = 0;
-        for( Future<Object> future: futures) {
-            assertNotNull("Recording operation Future is null", future);
-
-            Class<?> expResultType = expResultTypes[i++];
-            if(Throwable.class.isAssignableFrom(expResultType)) {
-                try {
-                    Await.result(future, Duration.create(5, TimeUnit.SECONDS));
-                    fail("Expected exception from recording operation Future");
-                } catch(Exception e) {
-                    // Expected
-                }
-            } else {
-                assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
-                             Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
-            }
-        }
-    }
-
     @Test
     public void testWrite() throws Exception {
         dataStoreContextBuilder.shardBatchedModificationCount(1);
@@ -405,9 +377,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         transactionProxy.ready();
 
         verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
     }
 
     @Test(expected=IllegalStateException.class)
@@ -479,9 +448,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
-
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
@@ -511,8 +477,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
-
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
@@ -544,9 +508,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
-
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
@@ -561,33 +522,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
     }
 
-    @Test
-    public void testReadyWithRecordingOperationFailure() throws Exception {
-        dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectFailedBatchedModifications(actorRef);
-
-        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
-
-        verifyCohortFutures(proxy, TestException.class);
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
-    }
-
     @Test
     public void testReadyWithReplyFailure() throws Exception {
         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
@@ -1245,14 +1179,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
         verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
                 new DeleteModification(deletePath2));
-
-        if(optimizedWriteOnly) {
-            verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                    BatchedModificationsReply.class, BatchedModificationsReply.class);
-        } else {
-            verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                    BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
-        }
     }
 
     @Test
@@ -1357,9 +1283,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
     }
 
     @Test