Refactor TransactionProxy
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index d79cd6f69f4b0e3e4f1171a332b45c36adbbd515..5bc53442aeff04aa43c299a49890b3ecfe70b974 100644 (file)
@@ -18,8 +18,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -257,14 +255,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         throttleOperation();
 
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        return txFutureCallback.enqueueReadOperation(new ReadOperation<Optional<NormalizedNode<?, ?>>>() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> invoke(
-                    TransactionContext transactionContext) {
-                return transactionContext.readData(path);
+            public void invoke(TransactionContext transactionContext) {
+                transactionContext.readData(path, proxyFuture);
             }
         });
+
+        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
     @Override
@@ -277,15 +278,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         throttleOperation();
 
+        final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        return txFutureCallback.enqueueReadOperation(new ReadOperation<Boolean>() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public CheckedFuture<Boolean, ReadFailedException> invoke(TransactionContext transactionContext) {
-                return transactionContext.dataExists(path);
+            public void invoke(TransactionContext transactionContext) {
+                transactionContext.dataExists(path, proxyFuture);
             }
         });
-    }
 
+        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+    }
 
     private void checkModificationState() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
@@ -323,7 +327,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.writeData(path, data);
@@ -341,7 +345,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.mergeData(path, data);
@@ -359,7 +363,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.deleteData(path);
@@ -386,12 +390,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier,
                         txFutureCallback.getShardName(), transactionChainId);
 
-            Future<ActorSelection> future = txFutureCallback.enqueueFutureOperation(new FutureOperation<ActorSelection>() {
-                @Override
-                public Future<ActorSelection> invoke(TransactionContext transactionContext) {
-                    return transactionContext.readyTransaction();
-                }
-            });
+            final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+            final Future<ActorSelection> future;
+            if (transactionContext != null) {
+                // avoid the creation of a promise and a TransactionOperation
+                future = transactionContext.readyTransaction();
+            } else {
+                final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
+                txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                    @Override
+                    public void invoke(TransactionContext transactionContext) {
+                        promise.completeWith(transactionContext.readyTransaction());
+                    }
+                });
+                future = promise.future();
+            }
 
             cohortFutures.add(future);
         }
@@ -430,7 +443,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     @Override
     public void close() {
         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
                 @Override
                 public void invoke(TransactionContext transactionContext) {
                     transactionContext.closeTransaction();
@@ -492,20 +505,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         void invoke(TransactionContext transactionContext);
     }
 
-    /**
-     * This interface returns a Guava Future
-     */
-    private static interface ReadOperation<T> {
-        CheckedFuture<T, ReadFailedException> invoke(TransactionContext transactionContext);
-    }
-
-    /**
-     * This interface returns a Scala Future
-     */
-    private static interface FutureOperation<T> {
-        Future<T> invoke(TransactionContext transactionContext);
-    }
-
     /**
      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
@@ -582,64 +581,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-
-        <T> Future<T> enqueueFutureOperation(final FutureOperation<T> op) {
-
-            Future<T> future;
-
-            if (transactionContext != null) {
-                future = op.invoke(transactionContext);
-            } else {
-                // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-                // callback to be executed after the Tx is created.
-                final Promise<T> promise = akka.dispatch.Futures.promise();
-                addTxOperationOnComplete(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        promise.completeWith(op.invoke(transactionContext));
-                    }
-                });
-
-                future = promise.future();
-            }
-
-            return future;
-        }
-
-        <T> CheckedFuture<T, ReadFailedException> enqueueReadOperation(final ReadOperation<T> op) {
-
-            CheckedFuture<T, ReadFailedException> future;
-
-            if (transactionContext != null) {
-                future = op.invoke(transactionContext);
-            } else {
-                // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-                // callback to be executed after the Tx is created.
-                final SettableFuture<T> proxyFuture = SettableFuture.create();
-                addTxOperationOnComplete(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        Futures.addCallback(op.invoke(transactionContext), new FutureCallback<T>() {
-                            @Override
-                            public void onSuccess(T data) {
-                                proxyFuture.set(data);
-                            }
-
-                            @Override
-                            public void onFailure(Throwable t) {
-                                proxyFuture.setException(t);
-                            }
-                        });
-                    }
-                });
-
-                future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
-            }
-
-            return future;
-        }
-
-        void enqueueModifyOperation(final TransactionOperation op) {
+        void enqueueTransactionOperation(final TransactionOperation op) {
 
             if (transactionContext != null) {
                 op.invoke(transactionContext);