Merge "BUG 2412 - restconf @GET getModule(identifier,uri) method migration"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index f28a1e5f73d4823fe31ba20d14fa4d986f9700da..58b37be2a2727babd9b0305e868d85d90c079052 100644 (file)
@@ -18,9 +18,9 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -255,14 +255,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         throttleOperation();
 
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        return txFutureCallback.enqueueReadOperation(new ReadOperation<Optional<NormalizedNode<?, ?>>>() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> invoke(
-                    TransactionContext transactionContext) {
-                return transactionContext.readData(path);
+            public void invoke(TransactionContext transactionContext) {
+                transactionContext.readData(path, proxyFuture);
             }
         });
+
+        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
     @Override
@@ -275,15 +278,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         throttleOperation();
 
+        final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        return txFutureCallback.enqueueReadOperation(new ReadOperation<Boolean>() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public CheckedFuture<Boolean, ReadFailedException> invoke(TransactionContext transactionContext) {
-                return transactionContext.dataExists(path);
+            public void invoke(TransactionContext transactionContext) {
+                transactionContext.dataExists(path, proxyFuture);
             }
         });
-    }
 
+        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+    }
 
     private void checkModificationState() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
@@ -298,7 +304,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     private void throttleOperation(int acquirePermits) {
         try {
-            if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+            if(!operationLimiter.tryAcquire(acquirePermits,
+                    actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
             }
         } catch (InterruptedException e) {
@@ -321,7 +328,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.writeData(path, data);
@@ -339,7 +346,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.mergeData(path, data);
@@ -357,7 +364,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.deleteData(path);
@@ -384,12 +391,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier,
                         txFutureCallback.getShardName(), transactionChainId);
 
-            Future<ActorSelection> future = txFutureCallback.enqueueFutureOperation(new FutureOperation<ActorSelection>() {
-                @Override
-                public Future<ActorSelection> invoke(TransactionContext transactionContext) {
-                    return transactionContext.readyTransaction();
-                }
-            });
+            final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+            final Future<ActorSelection> future;
+            if (transactionContext != null) {
+                // avoid the creation of a promise and a TransactionOperation
+                future = transactionContext.readyTransaction();
+            } else {
+                final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
+                txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                    @Override
+                    public void invoke(TransactionContext transactionContext) {
+                        promise.completeWith(transactionContext.readyTransaction());
+                    }
+                });
+                future = promise.future();
+            }
 
             cohortFutures.add(future);
         }
@@ -428,7 +444,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     @Override
     public void close() {
         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
                 @Override
                 public void invoke(TransactionContext transactionContext) {
                     transactionContext.closeTransaction();
@@ -469,7 +485,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         newTxFutureCallback.setPrimaryShard(primaryShard);
                     }
                 }
-            }, actorContext.getActorSystem().dispatcher());
+            }, actorContext.getClientDispatcher());
         }
 
         return txFutureCallback;
@@ -490,20 +506,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         void invoke(TransactionContext transactionContext);
     }
 
-    /**
-     * This interface returns a Guava Future
-     */
-    private static interface ReadOperation<T> {
-        CheckedFuture<T, ReadFailedException> invoke(TransactionContext transactionContext);
-    }
-
-    /**
-     * This interface returns a Scala Future
-     */
-    private static interface FutureOperation<T> {
-        Future<T> invoke(TransactionContext transactionContext);
-    }
-
     /**
      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
@@ -565,75 +567,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Adds a TransactionOperation to be executed after the CreateTransaction completes.
          */
         void addTxOperationOnComplete(TransactionOperation operation) {
+            boolean invokeOperation = true;
             synchronized(txOperationsOnComplete) {
                 if(transactionContext == null) {
                     LOG.debug("Tx {} Adding operation on complete {}", identifier);
 
+                    invokeOperation = false;
                     txOperationsOnComplete.add(operation);
-                } else {
-                    operation.invoke(transactionContext);
                 }
             }
-        }
-
-
-        <T> Future<T> enqueueFutureOperation(final FutureOperation<T> op) {
-
-            Future<T> future;
-
-            if (transactionContext != null) {
-                future = op.invoke(transactionContext);
-            } else {
-                // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-                // callback to be executed after the Tx is created.
-                final Promise<T> promise = akka.dispatch.Futures.promise();
-                addTxOperationOnComplete(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        promise.completeWith(op.invoke(transactionContext));
-                    }
-                });
-
-                future = promise.future();
-            }
-
-            return future;
-        }
-
-        <T> CheckedFuture<T, ReadFailedException> enqueueReadOperation(final ReadOperation<T> op) {
-
-            CheckedFuture<T, ReadFailedException> future;
-
-            if (transactionContext != null) {
-                future = op.invoke(transactionContext);
-            } else {
-                // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-                // callback to be executed after the Tx is created.
-                final SettableFuture<T> proxyFuture = SettableFuture.create();
-                addTxOperationOnComplete(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        Futures.addCallback(op.invoke(transactionContext), new FutureCallback<T>() {
-                            @Override
-                            public void onSuccess(T data) {
-                                proxyFuture.set(data);
-                            }
-
-                            @Override
-                            public void onFailure(Throwable t) {
-                                proxyFuture.setException(t);
-                            }
-                        });
-                    }
-                });
 
-                future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+            if(invokeOperation) {
+                operation.invoke(transactionContext);
             }
-
-            return future;
         }
 
-        void enqueueModifyOperation(final TransactionOperation op) {
+        void enqueueTransactionOperation(final TransactionOperation op) {
 
             if (transactionContext != null) {
                 op.invoke(transactionContext);
@@ -653,7 +602,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                             TransactionProxy.this.transactionType.ordinal(),
                             getTransactionChainId()).toSerializable());
 
-            createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher());
+            createTxFuture.onComplete(this, actorContext.getClientDispatcher());
         }
 
         @Override
@@ -673,55 +622,75 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                                 public void run() {
                                     tryCreateTransaction();
                                 }
-                            }, actorContext.getActorSystem().dispatcher());
+                            }, actorContext.getClientDispatcher());
                     return;
                 }
             }
 
-            // Create the TransactionContext from the response or failure and execute delayed
-            // TransactionOperations. This entire section is done atomically (ie synchronized) with
-            // respect to #addTxOperationOnComplete to handle timing issues and ensure no
-            // TransactionOperation is missed and that they are processed in the order they occurred.
-            synchronized(txOperationsOnComplete) {
-                // Store the new TransactionContext locally until we've completed invoking the
-                // TransactionOperations. This avoids thread timing issues which could cause
-                // out-of-order TransactionOperations. Eg, on a modification operation, if the
-                // TransactionContext is non-null, then we directly call the TransactionContext.
-                // However, at the same time, the code may be executing the cached
-                // TransactionOperations. So to avoid thus timing, we don't publish the
-                // TransactionContext until after we've executed all cached TransactionOperations.
-                TransactionContext localTransactionContext;
-                if(failure != null) {
-                    LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
-                            failure.getMessage());
-
-                    localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
-                } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
-                    localTransactionContext = createValidTransactionContext(
-                            CreateTransactionReply.fromSerializable(response));
-                } else {
-                    IllegalArgumentException exception = new IllegalArgumentException(String.format(
+            // Create the TransactionContext from the response or failure. Store the new
+            // TransactionContext locally until we've completed invoking the
+            // TransactionOperations. This avoids thread timing issues which could cause
+            // out-of-order TransactionOperations. Eg, on a modification operation, if the
+            // TransactionContext is non-null, then we directly call the TransactionContext.
+            // However, at the same time, the code may be executing the cached
+            // TransactionOperations. So to avoid thus timing, we don't publish the
+            // TransactionContext until after we've executed all cached TransactionOperations.
+            TransactionContext localTransactionContext;
+            if(failure != null) {
+                LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
+                        failure.getMessage());
+
+                localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
+            } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+                localTransactionContext = createValidTransactionContext(
+                        CreateTransactionReply.fromSerializable(response));
+            } else {
+                IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
+                localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
+            }
+
+            executeTxOperatonsOnComplete(localTransactionContext);
+        }
+
+        private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
+            while(true) {
+                // Access to txOperationsOnComplete and transactionContext must be protected and atomic
+                // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
+                // issues and ensure no TransactionOperation is missed and that they are processed
+                // in the order they occurred.
+
+                // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
+                // in case a TransactionOperation results in another transaction operation being
+                // queued (eg a put operation from a client read Future callback that is notified
+                // synchronously).
+                Collection<TransactionOperation> operationsBatch = null;
+                synchronized(txOperationsOnComplete) {
+                    if(txOperationsOnComplete.isEmpty()) {
+                        // We're done invoking the TransactionOperations so we can now publish the
+                        // TransactionContext.
+                        transactionContext = localTransactionContext;
+                        break;
+                    }
+
+                    operationsBatch = new ArrayList<>(txOperationsOnComplete);
+                    txOperationsOnComplete.clear();
                 }
 
-                for(TransactionOperation oper: txOperationsOnComplete) {
+                // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
+                // A slight down-side is that we need to re-acquire the lock below but this should
+                // be negligible.
+                for(TransactionOperation oper: operationsBatch) {
                     oper.invoke(localTransactionContext);
                 }
-
-                txOperationsOnComplete.clear();
-
-                // We're done invoking the TransactionOperations so we can now publish the
-                // TransactionContext.
-                transactionContext = localTransactionContext;
             }
         }
 
         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
             String transactionPath = reply.getTransactionPath();
 
-            LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
+            LOG.debug("Tx {} Received {}", identifier, reply);
 
             ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
 
@@ -739,8 +708,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
-            return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
+                return new TransactionContextImpl(transactionPath, transactionActor, identifier,
+                    actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            } else {
+                return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier,
+                        actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            }
         }
     }
 }