X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=5bc53442aeff04aa43c299a49890b3ecfe70b974;hp=d79cd6f69f4b0e3e4f1171a332b45c36adbbd515;hb=413bae822cdbf37f4dc16ebe14cab621953e817a;hpb=166c432bc0611288abf2e13ef8f184cfbb2c101a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index d79cd6f69f..5bc53442ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -18,8 +18,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; @@ -257,14 +255,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); + final SettableFuture>> proxyFuture = SettableFuture.create(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - return txFutureCallback.enqueueReadOperation(new ReadOperation>>() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override - public CheckedFuture>, ReadFailedException> invoke( - TransactionContext transactionContext) { - return transactionContext.readData(path); + public void invoke(TransactionContext transactionContext) { + transactionContext.readData(path, proxyFuture); } }); + + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } @Override @@ -277,15 +278,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); + final SettableFuture proxyFuture = SettableFuture.create(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - return txFutureCallback.enqueueReadOperation(new ReadOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override - public CheckedFuture invoke(TransactionContext transactionContext) { - return transactionContext.dataExists(path); + public void invoke(TransactionContext transactionContext) { + transactionContext.dataExists(path, proxyFuture); } }); - } + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); + } private void checkModificationState() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, @@ -323,7 +327,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.writeData(path, data); @@ -341,7 +345,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.mergeData(path, data); @@ -359,7 +363,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.deleteData(path); @@ -386,12 +390,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier, txFutureCallback.getShardName(), transactionChainId); - Future future = txFutureCallback.enqueueFutureOperation(new FutureOperation() { - @Override - public Future invoke(TransactionContext transactionContext) { - return transactionContext.readyTransaction(); - } - }); + final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + final Future future; + if (transactionContext != null) { + // avoid the creation of a promise and a TransactionOperation + future = transactionContext.readyTransaction(); + } else { + final Promise promise = akka.dispatch.Futures.promise(); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(transactionContext.readyTransaction()); + } + }); + future = promise.future(); + } cohortFutures.add(future); } @@ -430,7 +443,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void close() { for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.closeTransaction(); @@ -492,20 +505,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { void invoke(TransactionContext transactionContext); } - /** - * This interface returns a Guava Future - */ - private static interface ReadOperation { - CheckedFuture invoke(TransactionContext transactionContext); - } - - /** - * This interface returns a Scala Future - */ - private static interface FutureOperation { - Future invoke(TransactionContext transactionContext); - } - /** * Implements a Future OnComplete callback for a CreateTransaction message. This class handles * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a @@ -582,64 +581,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - - Future enqueueFutureOperation(final FutureOperation op) { - - Future future; - - if (transactionContext != null) { - future = op.invoke(transactionContext); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - final Promise promise = akka.dispatch.Futures.promise(); - addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(op.invoke(transactionContext)); - } - }); - - future = promise.future(); - } - - return future; - } - - CheckedFuture enqueueReadOperation(final ReadOperation op) { - - CheckedFuture future; - - if (transactionContext != null) { - future = op.invoke(transactionContext); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - final SettableFuture proxyFuture = SettableFuture.create(); - addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - Futures.addCallback(op.invoke(transactionContext), new FutureCallback() { - @Override - public void onSuccess(T data) { - proxyFuture.set(data); - } - - @Override - public void onFailure(Throwable t) { - proxyFuture.setException(t); - } - }); - } - }); - - future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); - } - - return future; - } - - void enqueueModifyOperation(final TransactionOperation op) { + void enqueueTransactionOperation(final TransactionOperation op) { if (transactionContext != null) { op.invoke(transactionContext);