Bug 2588: Fix read transaction failures 82/17682/1
authorTom Pantelis <tpanteli@brocade.com>
Thu, 2 Apr 2015 19:45:30 +0000 (15:45 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 2 Apr 2015 19:45:30 +0000 (15:45 -0400)
TransactionContextImpl#read now sends the ReadData message immediately
instead of waiting for the previous modification operation Futures to
complete. This ensures the read will be performed by the actor before
the transaction is readied. Also did the same for dataExists.

Change-Id: I9c56b32162a957e27f9ff01485686e90ebec77a7
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index 839c6c24d6cf81b46d899883229a9f5292721e01..c61682d8efe98cf1649bebc03b381b6afeeb1d76 100644 (file)
@@ -224,58 +224,16 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     }
 
     @Override
-    public void readData(
-            final YangInstanceIdentifier path,final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
+    public void readData(final YangInstanceIdentifier path,
+            final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
 
         LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
 
-        // Send the remaining batched modifications if any.
+        // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
+        // public API contract.
 
         sendAndRecordBatchedModifications();
 
-        // If there were any previous recorded put/merge/delete operation reply Futures then we
-        // must wait for them to successfully complete. This is necessary to honor the read
-        // uncommitted semantics of the public API contract. If any one fails then fail the read.
-
-        if(recordedOperationCount() == 0) {
-            finishReadData(path, returnFuture);
-        } else {
-            LOG.debug("Tx {} readData: verifying {} previous recorded operations",
-                getIdentifier(), recordedOperationCount());
-
-            // Note: we make a copy of recordedOperationFutures to be on the safe side in case
-            // Futures#sequence accesses the passed List on a different thread, as
-            // recordedOperationFutures is not synchronized.
-
-            Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
-                copyRecordedOperationFutures(), actorContext.getClientDispatcher());
-
-            OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
-                @Override
-                public void onComplete(Throwable failure, Iterable<Object> notUsed)
-                        throws Throwable {
-                    if(failure != null) {
-                        LOG.debug("Tx {} readData: a recorded operation failed: {}",
-                            getIdentifier(), failure);
-                        returnFuture.setException(new ReadFailedException(
-                                "The read could not be performed because a previous put, merge,"
-                                + "or delete operation failed", failure));
-                    } else {
-                        finishReadData(path, returnFuture);
-                    }
-                }
-            };
-
-            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
-        }
-
-    }
-
-    private void finishReadData(final YangInstanceIdentifier path,
-            final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
-
-        LOG.debug("Tx {} finishReadData called path = {}", getIdentifier(), path);
-
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object readResponse) throws Throwable {
@@ -313,53 +271,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
         LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
 
-        // Send the remaining batched modifications if any.
+        // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
+        // public API contract.
 
         sendAndRecordBatchedModifications();
 
-        // If there were any previous recorded put/merge/delete operation reply Futures then we
-        // must wait for them to successfully complete. This is necessary to honor the read
-        // uncommitted semantics of the public API contract. If any one fails then fail this
-        // request.
-
-        if(recordedOperationCount() == 0) {
-            finishDataExists(path, returnFuture);
-        } else {
-            LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
-                getIdentifier(), recordedOperationCount());
-
-            // Note: we make a copy of recordedOperationFutures to be on the safe side in case
-            // Futures#sequence accesses the passed List on a different thread, as
-            // recordedOperationFutures is not synchronized.
-
-            Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
-                    copyRecordedOperationFutures(),
-                    actorContext.getClientDispatcher());
-            OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
-                @Override
-                public void onComplete(Throwable failure, Iterable<Object> notUsed)
-                        throws Throwable {
-                    if(failure != null) {
-                        LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
-                            getIdentifier(), failure);
-                        returnFuture.setException(new ReadFailedException(
-                                "The data exists could not be performed because a previous "
-                                + "put, merge, or delete operation failed", failure));
-                    } else {
-                        finishDataExists(path, returnFuture);
-                    }
-                }
-            };
-
-            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
-        }
-    }
-
-    private void finishDataExists(final YangInstanceIdentifier path,
-            final SettableFuture<Boolean> returnFuture) {
-
-        LOG.debug("Tx {} finishDataExists called path = {}", getIdentifier(), path);
-
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object response) throws Throwable {
index 265ec59f1cd324889627281850ffae9ed456c52a..a2471001864c1571a8a52d4abf0b8f41e7571ea3 100644 (file)
@@ -10,7 +10,6 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
@@ -164,34 +163,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         testReadWithExceptionOnInitialCreateTransaction(new TestException());
     }
 
-    @Test(expected = TestException.class)
-    public void testReadWithPriorRecordingOperationFailure() throws Throwable {
-        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
-                when(mockActorContext).getDatastoreContext();
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectFailedBatchedModifications(actorRef);
-
-        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        try {
-            propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
-        } finally {
-            verify(mockActorContext, times(0)).executeOperationAsync(
-                    eq(actorSelection(actorRef)), eqSerializedReadData());
-        }
-    }
-
     @Test
     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
@@ -301,35 +272,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
     }
 
-    @Test(expected = TestException.class)
-    public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
-        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
-                when(mockActorContext).getDatastoreContext();
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectFailedBatchedModifications(actorRef);
-
-        doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        try {
-            propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
-        } finally {
-            verify(mockActorContext, times(0)).executeOperationAsync(
-                    eq(actorSelection(actorRef)), eqSerializedDataExists());
-        }
-    }
-
     @Test
     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);