Refactor TransactionProxy 18/14618/3
authorGary Wu <Gary.Wu1@huawei.com>
Thu, 29 Jan 2015 22:24:18 +0000 (14:24 -0800)
committerGary Wu <Gary.Wu1@huawei.com>
Mon, 2 Feb 2015 19:23:04 +0000 (19:23 +0000)
Consolidated three transaction operation interfaces
into one.  Eliminated creation of redundant future/promise
objects for queued read operations.

Change-Id: I12a91cac5298f2722e30ed52ee91e35fdf1104d6
Signed-off-by: Gary Wu <Gary.Wu1@huawei.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java

index fc10b9c..84f0776 100644 (file)
@@ -9,8 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Semaphore;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -63,20 +62,16 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
-            YangInstanceIdentifier path) {
+    public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         LOG.debug("Tx {} readData called path = {}", identifier, path);
         operationLimiter.release();
-        return Futures.immediateFailedCheckedFuture(new ReadFailedException(
-                "Error reading data for path " + path, failure));
+        proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, failure));
     }
 
     @Override
-    public CheckedFuture<Boolean, ReadFailedException> dataExists(
-            YangInstanceIdentifier path) {
+    public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
         LOG.debug("Tx {} dataExists called path = {}", identifier, path);
         operationLimiter.release();
-        return Futures.immediateFailedCheckedFuture(new ReadFailedException(
-                "Error checking exists for path " + path, failure));
+        proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
     }
-}
\ No newline at end of file
+}
index b6af31e..1b8e65e 100644 (file)
@@ -9,9 +9,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.Future;
@@ -31,10 +30,9 @@ interface TransactionContext {
 
     void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
-    CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
-            final YangInstanceIdentifier path);
+    void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture);
 
-    CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
+    void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
 
     List<Future<Object>> getRecordedOperationFutures();
-}
\ No newline at end of file
+}
index ce2c99e..530a36c 100644 (file)
@@ -12,7 +12,6 @@ import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
@@ -30,7 +29,6 @@ import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializa
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -179,13 +177,11 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     }
 
     @Override
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
-            final YangInstanceIdentifier path) {
+    public void readData(
+            final YangInstanceIdentifier path,final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
 
         LOG.debug("Tx {} readData called path = {}", identifier, path);
 
-        final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
-
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail the read.
@@ -223,7 +219,6 @@ final class TransactionContextImpl extends AbstractTransactionContext {
             combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
 
-        return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
     }
 
     private void finishReadData(final YangInstanceIdentifier path,
@@ -264,13 +259,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     }
 
     @Override
-    public CheckedFuture<Boolean, ReadFailedException> dataExists(
-            final YangInstanceIdentifier path) {
+    public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> returnFuture) {
 
         LOG.debug("Tx {} dataExists called path = {}", identifier, path);
 
-        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
-
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail this
@@ -307,8 +299,6 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
             combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
-
-        return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
     }
 
     private void finishDataExists(final YangInstanceIdentifier path,
@@ -344,4 +334,4 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
     }
-}
\ No newline at end of file
+}
index d79cd6f..5bc5344 100644 (file)
@@ -18,8 +18,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -257,14 +255,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         throttleOperation();
 
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        return txFutureCallback.enqueueReadOperation(new ReadOperation<Optional<NormalizedNode<?, ?>>>() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> invoke(
-                    TransactionContext transactionContext) {
-                return transactionContext.readData(path);
+            public void invoke(TransactionContext transactionContext) {
+                transactionContext.readData(path, proxyFuture);
             }
         });
+
+        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
     @Override
@@ -277,15 +278,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         throttleOperation();
 
+        final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        return txFutureCallback.enqueueReadOperation(new ReadOperation<Boolean>() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public CheckedFuture<Boolean, ReadFailedException> invoke(TransactionContext transactionContext) {
-                return transactionContext.dataExists(path);
+            public void invoke(TransactionContext transactionContext) {
+                transactionContext.dataExists(path, proxyFuture);
             }
         });
-    }
 
+        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+    }
 
     private void checkModificationState() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
@@ -323,7 +327,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.writeData(path, data);
@@ -341,7 +345,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.mergeData(path, data);
@@ -359,7 +363,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.deleteData(path);
@@ -386,12 +390,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier,
                         txFutureCallback.getShardName(), transactionChainId);
 
-            Future<ActorSelection> future = txFutureCallback.enqueueFutureOperation(new FutureOperation<ActorSelection>() {
-                @Override
-                public Future<ActorSelection> invoke(TransactionContext transactionContext) {
-                    return transactionContext.readyTransaction();
-                }
-            });
+            final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+            final Future<ActorSelection> future;
+            if (transactionContext != null) {
+                // avoid the creation of a promise and a TransactionOperation
+                future = transactionContext.readyTransaction();
+            } else {
+                final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
+                txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                    @Override
+                    public void invoke(TransactionContext transactionContext) {
+                        promise.completeWith(transactionContext.readyTransaction());
+                    }
+                });
+                future = promise.future();
+            }
 
             cohortFutures.add(future);
         }
@@ -430,7 +443,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     @Override
     public void close() {
         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
                 @Override
                 public void invoke(TransactionContext transactionContext) {
                     transactionContext.closeTransaction();
@@ -492,20 +505,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         void invoke(TransactionContext transactionContext);
     }
 
-    /**
-     * This interface returns a Guava Future
-     */
-    private static interface ReadOperation<T> {
-        CheckedFuture<T, ReadFailedException> invoke(TransactionContext transactionContext);
-    }
-
-    /**
-     * This interface returns a Scala Future
-     */
-    private static interface FutureOperation<T> {
-        Future<T> invoke(TransactionContext transactionContext);
-    }
-
     /**
      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
@@ -582,64 +581,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-
-        <T> Future<T> enqueueFutureOperation(final FutureOperation<T> op) {
-
-            Future<T> future;
-
-            if (transactionContext != null) {
-                future = op.invoke(transactionContext);
-            } else {
-                // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-                // callback to be executed after the Tx is created.
-                final Promise<T> promise = akka.dispatch.Futures.promise();
-                addTxOperationOnComplete(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        promise.completeWith(op.invoke(transactionContext));
-                    }
-                });
-
-                future = promise.future();
-            }
-
-            return future;
-        }
-
-        <T> CheckedFuture<T, ReadFailedException> enqueueReadOperation(final ReadOperation<T> op) {
-
-            CheckedFuture<T, ReadFailedException> future;
-
-            if (transactionContext != null) {
-                future = op.invoke(transactionContext);
-            } else {
-                // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-                // callback to be executed after the Tx is created.
-                final SettableFuture<T> proxyFuture = SettableFuture.create();
-                addTxOperationOnComplete(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        Futures.addCallback(op.invoke(transactionContext), new FutureCallback<T>() {
-                            @Override
-                            public void onSuccess(T data) {
-                                proxyFuture.set(data);
-                            }
-
-                            @Override
-                            public void onFailure(Throwable t) {
-                                proxyFuture.setException(t);
-                            }
-                        });
-                    }
-                });
-
-                future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
-            }
-
-            return future;
-        }
-
-        void enqueueModifyOperation(final TransactionOperation op) {
+        void enqueueTransactionOperation(final TransactionOperation op) {
 
             if (transactionContext != null) {
                 op.invoke(transactionContext);