From 413bae822cdbf37f4dc16ebe14cab621953e817a Mon Sep 17 00:00:00 2001 From: Gary Wu Date: Thu, 29 Jan 2015 14:24:18 -0800 Subject: [PATCH] Refactor TransactionProxy Consolidated three transaction operation interfaces into one. Eliminated creation of redundant future/promise objects for queued read operations. Change-Id: I12a91cac5298f2722e30ed52ee91e35fdf1104d6 Signed-off-by: Gary Wu --- .../datastore/NoOpTransactionContext.java | 17 +-- .../cluster/datastore/TransactionContext.java | 10 +- .../datastore/TransactionContextImpl.java | 18 +-- .../cluster/datastore/TransactionProxy.java | 126 +++++------------- 4 files changed, 48 insertions(+), 123 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java index fc10b9c23b..84f07760f5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java @@ -9,8 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; import java.util.concurrent.Semaphore; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -63,20 +62,16 @@ final class NoOpTransactionContext extends AbstractTransactionContext { } @Override - public CheckedFuture>, ReadFailedException> readData( - YangInstanceIdentifier path) { + public void readData(final YangInstanceIdentifier path, SettableFuture>> proxyFuture) { LOG.debug("Tx {} readData called path = {}", identifier, path); operationLimiter.release(); - return Futures.immediateFailedCheckedFuture(new ReadFailedException( - "Error reading data for path " + path, failure)); + proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, failure)); } @Override - public CheckedFuture dataExists( - YangInstanceIdentifier path) { + public void dataExists(YangInstanceIdentifier path, SettableFuture proxyFuture) { LOG.debug("Tx {} dataExists called path = {}", identifier, path); operationLimiter.release(); - return Futures.immediateFailedCheckedFuture(new ReadFailedException( - "Error checking exists for path " + path, failure)); + proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure)); } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index b6af31e641..1b8e65e02d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -9,9 +9,8 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.SettableFuture; import java.util.List; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import scala.concurrent.Future; @@ -31,10 +30,9 @@ interface TransactionContext { void mergeData(YangInstanceIdentifier path, NormalizedNode data); - CheckedFuture>, ReadFailedException> readData( - final YangInstanceIdentifier path); + void readData(final YangInstanceIdentifier path, SettableFuture>> proxyFuture); - CheckedFuture dataExists(YangInstanceIdentifier path); + void dataExists(YangInstanceIdentifier path, SettableFuture proxyFuture); List> getRecordedOperationFutures(); -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index ce2c99ef52..530a36cff6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -12,7 +12,6 @@ import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import com.google.common.base.Optional; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.List; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; @@ -30,7 +29,6 @@ import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializa import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -179,13 +177,11 @@ final class TransactionContextImpl extends AbstractTransactionContext { } @Override - public CheckedFuture>, ReadFailedException> readData( - final YangInstanceIdentifier path) { + public void readData( + final YangInstanceIdentifier path,final SettableFuture>> returnFuture ) { LOG.debug("Tx {} readData called path = {}", identifier, path); - final SettableFuture>> returnFuture = SettableFuture.create(); - // If there were any previous recorded put/merge/delete operation reply Futures then we // must wait for them to successfully complete. This is necessary to honor the read // uncommitted semantics of the public API contract. If any one fails then fail the read. @@ -223,7 +219,6 @@ final class TransactionContextImpl extends AbstractTransactionContext { combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } - return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER); } private void finishReadData(final YangInstanceIdentifier path, @@ -264,13 +259,10 @@ final class TransactionContextImpl extends AbstractTransactionContext { } @Override - public CheckedFuture dataExists( - final YangInstanceIdentifier path) { + public void dataExists(final YangInstanceIdentifier path, final SettableFuture returnFuture) { LOG.debug("Tx {} dataExists called path = {}", identifier, path); - final SettableFuture returnFuture = SettableFuture.create(); - // If there were any previous recorded put/merge/delete operation reply Futures then we // must wait for them to successfully complete. This is necessary to honor the read // uncommitted semantics of the public API contract. If any one fails then fail this @@ -307,8 +299,6 @@ final class TransactionContextImpl extends AbstractTransactionContext { combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } - - return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER); } private void finishDataExists(final YangInstanceIdentifier path, @@ -344,4 +334,4 @@ final class TransactionContextImpl extends AbstractTransactionContext { future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index d79cd6f69f..5bc53442ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -18,8 +18,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; @@ -257,14 +255,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); + final SettableFuture>> proxyFuture = SettableFuture.create(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - return txFutureCallback.enqueueReadOperation(new ReadOperation>>() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override - public CheckedFuture>, ReadFailedException> invoke( - TransactionContext transactionContext) { - return transactionContext.readData(path); + public void invoke(TransactionContext transactionContext) { + transactionContext.readData(path, proxyFuture); } }); + + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } @Override @@ -277,15 +278,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); + final SettableFuture proxyFuture = SettableFuture.create(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - return txFutureCallback.enqueueReadOperation(new ReadOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override - public CheckedFuture invoke(TransactionContext transactionContext) { - return transactionContext.dataExists(path); + public void invoke(TransactionContext transactionContext) { + transactionContext.dataExists(path, proxyFuture); } }); - } + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); + } private void checkModificationState() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, @@ -323,7 +327,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.writeData(path, data); @@ -341,7 +345,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.mergeData(path, data); @@ -359,7 +363,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.deleteData(path); @@ -386,12 +390,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier, txFutureCallback.getShardName(), transactionChainId); - Future future = txFutureCallback.enqueueFutureOperation(new FutureOperation() { - @Override - public Future invoke(TransactionContext transactionContext) { - return transactionContext.readyTransaction(); - } - }); + final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + final Future future; + if (transactionContext != null) { + // avoid the creation of a promise and a TransactionOperation + future = transactionContext.readyTransaction(); + } else { + final Promise promise = akka.dispatch.Futures.promise(); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(transactionContext.readyTransaction()); + } + }); + future = promise.future(); + } cohortFutures.add(future); } @@ -430,7 +443,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void close() { for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - txFutureCallback.enqueueModifyOperation(new TransactionOperation() { + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.closeTransaction(); @@ -492,20 +505,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { void invoke(TransactionContext transactionContext); } - /** - * This interface returns a Guava Future - */ - private static interface ReadOperation { - CheckedFuture invoke(TransactionContext transactionContext); - } - - /** - * This interface returns a Scala Future - */ - private static interface FutureOperation { - Future invoke(TransactionContext transactionContext); - } - /** * Implements a Future OnComplete callback for a CreateTransaction message. This class handles * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a @@ -582,64 +581,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - - Future enqueueFutureOperation(final FutureOperation op) { - - Future future; - - if (transactionContext != null) { - future = op.invoke(transactionContext); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - final Promise promise = akka.dispatch.Futures.promise(); - addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(op.invoke(transactionContext)); - } - }); - - future = promise.future(); - } - - return future; - } - - CheckedFuture enqueueReadOperation(final ReadOperation op) { - - CheckedFuture future; - - if (transactionContext != null) { - future = op.invoke(transactionContext); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - final SettableFuture proxyFuture = SettableFuture.create(); - addTxOperationOnComplete(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - Futures.addCallback(op.invoke(transactionContext), new FutureCallback() { - @Override - public void onSuccess(T data) { - proxyFuture.set(data); - } - - @Override - public void onFailure(Throwable t) { - proxyFuture.setException(t); - } - }); - } - }); - - future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); - } - - return future; - } - - void enqueueModifyOperation(final TransactionOperation op) { + void enqueueTransactionOperation(final TransactionOperation op) { if (transactionContext != null) { op.invoke(transactionContext); -- 2.36.6