Fix RemoteTransactionContext limiter accounting
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RemoteTransactionContext.java
index ba64e29d7542ec2027c4d6f1cbabd11d54bd5af0..7c45c542bd1422887c5c96cd9f53ce3ce81e9268 100644 (file)
@@ -9,15 +9,14 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
-import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -41,6 +40,15 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
     private BatchedModifications batchedModifications;
     private int totalBatchedModificationsSent;
+    private int batchPermits;
+
+    /**
+     * We have observed a failed modification batch. This transaction context is effectively doomed, as the backend
+     * does not have a correct view of the world. If this happens, we do not limit operations but rather short-cut them
+     * to a either a no-op (modifications) or a failure (reads). Once the transaction is ready, though, we send the
+     * message to resynchronize with the backend, sharing a 'lost message' failure path.
+     */
+    private volatile Throwable failedModification;
 
     protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
             ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
@@ -50,11 +58,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         this.actorContext = actorContext;
     }
 
-    private Future<Object> completeOperation(Future<Object> operationFuture) {
-        operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
-        return operationFuture;
-    }
-
     private ActorSelection getActor() {
         return actor;
     }
@@ -63,10 +66,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         return actorContext;
     }
 
-    protected Future<Object> executeOperationAsync(SerializableMessage msg, Timeout timeout) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable(), timeout));
-    }
-
     @Override
     public void closeTransaction() {
         LOG.debug("Tx {} closeTransaction called", getIdentifier());
@@ -108,8 +107,12 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         return new BatchedModifications(getIdentifier(), getTransactionVersion());
     }
 
-    private void batchModification(Modification modification) {
+    private void batchModification(Modification modification, boolean havePermit) {
         incrementModificationCount();
+        if (havePermit) {
+            ++batchPermits;
+        }
+
         if (batchedModifications == null) {
             batchedModifications = newBatchedModifications();
         }
@@ -141,13 +144,39 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
 
             final BatchedModifications toSend = batchedModifications;
+            final int permitsToRelease = batchPermits;
+            batchPermits = 0;
+
             if (ready) {
                 batchedModifications = null;
             } else {
                 batchedModifications = newBatchedModifications();
+
+                final Throwable failure = failedModification;
+                if (failure != null) {
+                    // We have observed a modification failure, it does not make sense to send this batch. This speeds
+                    // up the time when the application could be blocked due to messages timing out and operation
+                    // limiter kicking in.
+                    LOG.debug("Tx {} modifications previously failed, not sending a non-ready batch", getIdentifier());
+                    limiter.release(permitsToRelease);
+                    return Futures.failed(failure);
+                }
             }
 
-            sent = executeOperationAsync(toSend, actorContext.getTransactionCommitOperationTimeout());
+            sent = actorContext.executeOperationAsync(getActor(), toSend.toSerializable(),
+                actorContext.getTransactionCommitOperationTimeout());
+            sent.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(Throwable failure, Object success) {
+                    if (failure != null) {
+                        LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
+                        failedModification = failure;
+                    } else {
+                        LOG.debug("Tx {} modifications completed with {}", getIdentifier(), success);
+                    }
+                    limiter.release(permitsToRelease);
+                }
+            }, actorContext.getClientDispatcher());
         }
 
         return sent;
@@ -158,8 +187,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
                 modification.getClass().getSimpleName(), modification.getPath());
 
-        acquireOperation();
-        batchModification(modification);
+        final boolean havePermit = failedModification == null && acquireOperation();
+        batchModification(modification, havePermit);
     }
 
     @Override
@@ -167,15 +196,30 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                 readCmd.getPath());
 
+        final Throwable failure = failedModification;
+        if (failure != null) {
+            // If we know there was a previous modification failure, we must not send a read request, as it risks
+            // returning incorrect data. We check this before acquiring an operation simply because we want the app
+            // to complete this transaction as soon as possible.
+            returnFuture.setException(new ReadFailedException("Previous modification failed, cannot "
+                    + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
+            return;
+        }
+
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
-        acquireOperation();
+        final boolean havePermit = acquireOperation();
         sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) throws Throwable {
+            public void onComplete(Throwable failure, Object response) {
+                // We have previously acquired an operation, now release it, no matter what happened
+                if (havePermit) {
+                    limiter.release();
+                }
+
                 if (failure != null) {
                     LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                             failure);
@@ -189,20 +233,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             }
         };
 
-        Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()),
-                actorContext.getOperationTimeout());
-
+        final Future<Object> future = actorContext.executeOperationAsync(getActor(),
+            readCmd.asVersion(getTransactionVersion()).toSerializable(), actorContext.getOperationTimeout());
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
     /**
-     * Acquire operation from the limiter if the hand-off has completed. If
-     * the hand-off is still ongoing, this method does nothing.
+     * Acquire operation from the limiter if the hand-off has completed. If the hand-off is still ongoing, this method
+     * does nothing.
+     *
+     * @return True if a permit was successfully acquired, false otherwise
      */
-    private void acquireOperation() {
-        if (isOperationHandOffComplete()) {
-            limiter.acquire();
-        }
+    private boolean acquireOperation() {
+        return isOperationHandOffComplete() && limiter.acquire();
     }
 
     @Override