X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextImpl.java;h=c61682d8efe98cf1649bebc03b381b6afeeb1d76;hb=e1a6ed792b504c2978c5259f926eaa09257c694c;hp=839c6c24d6cf81b46d899883229a9f5292721e01;hpb=c389b6dc9c717695f88a83c01a9fc67a1df68aac;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index 839c6c24d6..c61682d8ef 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -224,58 +224,16 @@ public class TransactionContextImpl extends AbstractTransactionContext { } @Override - public void readData( - final YangInstanceIdentifier path,final SettableFuture>> returnFuture ) { + public void readData(final YangInstanceIdentifier path, + final SettableFuture>> returnFuture ) { LOG.debug("Tx {} readData called path = {}", getIdentifier(), path); - // Send the remaining batched modifications if any. + // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the + // public API contract. sendAndRecordBatchedModifications(); - // If there were any previous recorded put/merge/delete operation reply Futures then we - // must wait for them to successfully complete. This is necessary to honor the read - // uncommitted semantics of the public API contract. If any one fails then fail the read. - - if(recordedOperationCount() == 0) { - finishReadData(path, returnFuture); - } else { - LOG.debug("Tx {} readData: verifying {} previous recorded operations", - getIdentifier(), recordedOperationCount()); - - // Note: we make a copy of recordedOperationFutures to be on the safe side in case - // Futures#sequence accesses the passed List on a different thread, as - // recordedOperationFutures is not synchronized. - - Future> combinedFutures = akka.dispatch.Futures.sequence( - copyRecordedOperationFutures(), actorContext.getClientDispatcher()); - - OnComplete> onComplete = new OnComplete>() { - @Override - public void onComplete(Throwable failure, Iterable notUsed) - throws Throwable { - if(failure != null) { - LOG.debug("Tx {} readData: a recorded operation failed: {}", - getIdentifier(), failure); - returnFuture.setException(new ReadFailedException( - "The read could not be performed because a previous put, merge," - + "or delete operation failed", failure)); - } else { - finishReadData(path, returnFuture); - } - } - }; - - combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher()); - } - - } - - private void finishReadData(final YangInstanceIdentifier path, - final SettableFuture>> returnFuture) { - - LOG.debug("Tx {} finishReadData called path = {}", getIdentifier(), path); - OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object readResponse) throws Throwable { @@ -313,53 +271,11 @@ public class TransactionContextImpl extends AbstractTransactionContext { LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path); - // Send the remaining batched modifications if any. + // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the + // public API contract. sendAndRecordBatchedModifications(); - // If there were any previous recorded put/merge/delete operation reply Futures then we - // must wait for them to successfully complete. This is necessary to honor the read - // uncommitted semantics of the public API contract. If any one fails then fail this - // request. - - if(recordedOperationCount() == 0) { - finishDataExists(path, returnFuture); - } else { - LOG.debug("Tx {} dataExists: verifying {} previous recorded operations", - getIdentifier(), recordedOperationCount()); - - // Note: we make a copy of recordedOperationFutures to be on the safe side in case - // Futures#sequence accesses the passed List on a different thread, as - // recordedOperationFutures is not synchronized. - - Future> combinedFutures = akka.dispatch.Futures.sequence( - copyRecordedOperationFutures(), - actorContext.getClientDispatcher()); - OnComplete> onComplete = new OnComplete>() { - @Override - public void onComplete(Throwable failure, Iterable notUsed) - throws Throwable { - if(failure != null) { - LOG.debug("Tx {} dataExists: a recorded operation failed: {}", - getIdentifier(), failure); - returnFuture.setException(new ReadFailedException( - "The data exists could not be performed because a previous " - + "put, merge, or delete operation failed", failure)); - } else { - finishDataExists(path, returnFuture); - } - } - }; - - combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher()); - } - } - - private void finishDataExists(final YangInstanceIdentifier path, - final SettableFuture returnFuture) { - - LOG.debug("Tx {} finishDataExists called path = {}", getIdentifier(), path); - OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object response) throws Throwable {