Fix RemoteTransactionContext limiter accounting 57/68757/6
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 9 Feb 2018 15:55:27 +0000 (16:55 +0100)
committerTom Pantelis <tompantelis@gmail.com>
Wed, 28 Feb 2018 16:09:16 +0000 (16:09 +0000)
In case we lose connectivity between the frontend and backend
at the early stages of a big transaction, e.g. after the transaction
is created at the backend and before it is submitted, we can run into
OperationLimiter preventing recovery.

The reason for this is that OperationLimiter itself does not know
how many permits a BatchedModification request contained, hence
on AskTimeoutException it would only decrement permits by one
and the operations would remain throttled. With large transactions
this means the application will suddenly become bogged down
by the OperationLimiter, preventing it from submitting the transaction
or otherwise recovering.

Once any BatchedModifications request fails, the transaction is
doomed anyway, as the message counts on frontend and backend will not
match. Furthermore we must not issue any reads -- the backed does
not have all the modifications, hence it could return an incorrect
result.

Move permit tracking to RemoteTransactionContext, where we can capture
the number of permits in the OnComplete that gets invoked, properly
returning permits which correspond to the BatchedModifications message.
If we have failed to acquire a permit, we also note that and do not
underflow the semaphore.

In case a BatchedModifications message fails, we mark that fact and
turn into a bypass mode: we fail any subsequent reads and do not send
any further BatchedModifications until we see ready being set -- at
which point we coordinate with backend to shoot down the transaction.

An alternative strategy would be to continue transmitting
BatchedModifications, but that would incur an AskTimeout during split,
slowing down the time it takes us to kill flush the doomed transaction
out of the system.

JIRA: CONTROLLER-1814
Change-Id: I919bae0e7173910665e8ec2342d076a710c1c7bf
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java [new file with mode: 0644]

index c035790..ea93f1f 100644 (file)
@@ -7,21 +7,18 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly
- * into akka to release permits as futures complete.
+ * Class for limiting operations.
  */
-public class OperationLimiter extends OnComplete<Object> {
+public class OperationLimiter  {
     private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class);
     private final TransactionIdentifier identifier;
     private final long acquireTimeout;
@@ -39,15 +36,17 @@ public class OperationLimiter extends OnComplete<Object> {
         this.semaphore = new Semaphore(maxPermits);
     }
 
-    void acquire() {
-        acquire(1);
+    boolean acquire() {
+        return acquire(1);
     }
 
-    void acquire(final int acquirePermits) {
+    boolean acquire(final int acquirePermits) {
         try {
-            if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
-                LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
+            if (semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
+                return true;
             }
+
+            LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
         } catch (InterruptedException e) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", identifier, e);
@@ -55,15 +54,16 @@ public class OperationLimiter extends OnComplete<Object> {
                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier);
             }
         }
+
+        return false;
     }
 
-    @Override
-    public void onComplete(final Throwable throwable, final Object message) {
-        if (message instanceof BatchedModificationsReply) {
-            this.semaphore.release(((BatchedModificationsReply)message).getNumBatched());
-        } else {
-            this.semaphore.release();
-        }
+    void release() {
+        release(1);
+    }
+
+    void release(int permits) {
+        this.semaphore.release(permits);
     }
 
     public TransactionIdentifier getIdentifier() {
index ba64e29..7c45c54 100644 (file)
@@ -9,15 +9,14 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
-import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -41,6 +40,15 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
     private BatchedModifications batchedModifications;
     private int totalBatchedModificationsSent;
+    private int batchPermits;
+
+    /**
+     * We have observed a failed modification batch. This transaction context is effectively doomed, as the backend
+     * does not have a correct view of the world. If this happens, we do not limit operations but rather short-cut them
+     * to a either a no-op (modifications) or a failure (reads). Once the transaction is ready, though, we send the
+     * message to resynchronize with the backend, sharing a 'lost message' failure path.
+     */
+    private volatile Throwable failedModification;
 
     protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
             ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
@@ -50,11 +58,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         this.actorContext = actorContext;
     }
 
-    private Future<Object> completeOperation(Future<Object> operationFuture) {
-        operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
-        return operationFuture;
-    }
-
     private ActorSelection getActor() {
         return actor;
     }
@@ -63,10 +66,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         return actorContext;
     }
 
-    protected Future<Object> executeOperationAsync(SerializableMessage msg, Timeout timeout) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable(), timeout));
-    }
-
     @Override
     public void closeTransaction() {
         LOG.debug("Tx {} closeTransaction called", getIdentifier());
@@ -108,8 +107,12 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         return new BatchedModifications(getIdentifier(), getTransactionVersion());
     }
 
-    private void batchModification(Modification modification) {
+    private void batchModification(Modification modification, boolean havePermit) {
         incrementModificationCount();
+        if (havePermit) {
+            ++batchPermits;
+        }
+
         if (batchedModifications == null) {
             batchedModifications = newBatchedModifications();
         }
@@ -141,13 +144,39 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
 
             final BatchedModifications toSend = batchedModifications;
+            final int permitsToRelease = batchPermits;
+            batchPermits = 0;
+
             if (ready) {
                 batchedModifications = null;
             } else {
                 batchedModifications = newBatchedModifications();
+
+                final Throwable failure = failedModification;
+                if (failure != null) {
+                    // We have observed a modification failure, it does not make sense to send this batch. This speeds
+                    // up the time when the application could be blocked due to messages timing out and operation
+                    // limiter kicking in.
+                    LOG.debug("Tx {} modifications previously failed, not sending a non-ready batch", getIdentifier());
+                    limiter.release(permitsToRelease);
+                    return Futures.failed(failure);
+                }
             }
 
-            sent = executeOperationAsync(toSend, actorContext.getTransactionCommitOperationTimeout());
+            sent = actorContext.executeOperationAsync(getActor(), toSend.toSerializable(),
+                actorContext.getTransactionCommitOperationTimeout());
+            sent.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(Throwable failure, Object success) {
+                    if (failure != null) {
+                        LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
+                        failedModification = failure;
+                    } else {
+                        LOG.debug("Tx {} modifications completed with {}", getIdentifier(), success);
+                    }
+                    limiter.release(permitsToRelease);
+                }
+            }, actorContext.getClientDispatcher());
         }
 
         return sent;
@@ -158,8 +187,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
                 modification.getClass().getSimpleName(), modification.getPath());
 
-        acquireOperation();
-        batchModification(modification);
+        final boolean havePermit = failedModification == null && acquireOperation();
+        batchModification(modification, havePermit);
     }
 
     @Override
@@ -167,15 +196,30 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                 readCmd.getPath());
 
+        final Throwable failure = failedModification;
+        if (failure != null) {
+            // If we know there was a previous modification failure, we must not send a read request, as it risks
+            // returning incorrect data. We check this before acquiring an operation simply because we want the app
+            // to complete this transaction as soon as possible.
+            returnFuture.setException(new ReadFailedException("Previous modification failed, cannot "
+                    + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
+            return;
+        }
+
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
-        acquireOperation();
+        final boolean havePermit = acquireOperation();
         sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Object response) throws Throwable {
+            public void onComplete(Throwable failure, Object response) {
+                // We have previously acquired an operation, now release it, no matter what happened
+                if (havePermit) {
+                    limiter.release();
+                }
+
                 if (failure != null) {
                     LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                             failure);
@@ -189,20 +233,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             }
         };
 
-        Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()),
-                actorContext.getOperationTimeout());
-
+        final Future<Object> future = actorContext.executeOperationAsync(getActor(),
+            readCmd.asVersion(getTransactionVersion()).toSerializable(), actorContext.getOperationTimeout());
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
     /**
-     * Acquire operation from the limiter if the hand-off has completed. If
-     * the hand-off is still ongoing, this method does nothing.
+     * Acquire operation from the limiter if the hand-off has completed. If the hand-off is still ongoing, this method
+     * does nothing.
+     *
+     * @return True if a permit was successfully acquired, false otherwise
      */
-    private void acquireOperation() {
-        if (isOperationHandOffComplete()) {
-            limiter.acquire();
-        }
+    private boolean acquireOperation() {
+        return isOperationHandOffComplete() && limiter.acquire();
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java
deleted file mode 100644 (file)
index d06a742..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-
-/**
- * Unit tests for OperationCompleter.
- *
- * @author Thomas Pantelis
- */
-public class OperationLimiterTest {
-    private final TransactionIdentifier transactionId = MockIdentifiers.transactionIdentifier(
-        OperationLimiterTest.class, "mock");
-
-    @Test
-    public void testOnComplete() throws Exception {
-        int permits = 10;
-        OperationLimiter limiter = new OperationLimiter(transactionId, permits, 1);
-        limiter.acquire(permits);
-        int availablePermits = 0;
-
-        limiter.onComplete(null, new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION));
-        assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
-
-        limiter.onComplete(null, new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION));
-        assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
-
-        limiter.onComplete(null, new IllegalArgumentException());
-        assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
-
-        limiter.onComplete(null, new BatchedModificationsReply(4));
-        availablePermits += 4;
-        assertEquals("availablePermits", availablePermits, limiter.availablePermits());
-    }
-
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java
new file mode 100644 (file)
index 0000000..526ad77
--- /dev/null
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import akka.actor.ActorRef;
+import akka.actor.Status.Failure;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.OnComplete;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Test whether RmoteTransactionContext operates correctly.
+ */
+public class RemoteTransactionContextTest extends AbstractActorTest {
+    private static final TransactionIdentifier TX_ID = new TransactionIdentifier(new LocalHistoryIdentifier(
+        ClientIdentifier.create(FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("test")), 0),
+        0), 0);
+    private static final DeleteModification DELETE = new DeleteModification(DataStoreVersions.CURRENT_VERSION);
+
+    private OperationLimiter limiter;
+    private RemoteTransactionContext txContext;
+    private ActorContext actorContext;
+    private TestKit kit;
+
+    @Before
+    public void before() {
+        kit = new TestKit(getSystem());
+        actorContext = Mockito.spy(new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+            mock(Configuration.class)));
+        limiter = new OperationLimiter(TX_ID, 4, 0);
+        txContext = new RemoteTransactionContext(TX_ID, actorContext.actorSelection(kit.getRef().path()), actorContext,
+            DataStoreVersions.CURRENT_VERSION, limiter);
+        txContext.operationHandOffComplete();
+    }
+
+    /**
+     * OperationLimiter should be correctly released when a failure, like AskTimeoutException occurs. Future reads
+     * need to complete immediately with the failure and modifications should not be throttled and thrown away
+     * immediately.
+     */
+    @Test
+    public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
+        txContext.executeModification(DELETE);
+        txContext.executeModification(DELETE);
+        assertEquals(2, limiter.availablePermits());
+
+        Future<Object> future = txContext.sendBatchedModifications();
+        assertEquals(2, limiter.availablePermits());
+
+        BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
+        assertEquals(2, msg.getModifications().size());
+        assertEquals(1, msg.getTotalMessagesSent());
+        sendReply(new Failure(new NullPointerException()));
+        assertFuture(future, new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) {
+                assertTrue(failure instanceof NullPointerException);
+                assertEquals(4, limiter.availablePermits());
+
+                // The transaction has failed, no throttling should occur
+                txContext.executeModification(DELETE);
+                assertEquals(4, limiter.availablePermits());
+
+                // Executing a read should result in immediate failure
+                final SettableFuture<Boolean> readFuture = SettableFuture.create();
+                txContext.executeRead(new DataExists(), readFuture);
+                assertTrue(readFuture.isDone());
+                try {
+                    readFuture.get();
+                    fail("Read future did not fail");
+                } catch (ExecutionException | InterruptedException e) {
+                    assertTrue(e.getCause() instanceof NullPointerException);
+                }
+            }
+        });
+
+        future = txContext.directCommit();
+
+        msg = kit.expectMsgClass(BatchedModifications.class);
+        // Modification should have been thrown away by the dropped transmit induced by executeRead()
+        assertEquals(0, msg.getModifications().size());
+        assertTrue(msg.isDoCommitOnReady());
+        assertTrue(msg.isReady());
+        assertEquals(2, msg.getTotalMessagesSent());
+        sendReply(new Failure(new IllegalStateException()));
+        assertFuture(future, new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) {
+                assertTrue(failure instanceof IllegalStateException);
+            }
+        });
+
+        kit.expectNoMsg();
+    }
+
+    /**
+     * OperationLimiter gives up throttling at some point -- {@link RemoteTransactionContext} needs to deal with that
+     * case, too.
+     */
+    @Test
+    public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
+        txContext.executeModification(DELETE);
+        txContext.executeModification(DELETE);
+        txContext.executeModification(DELETE);
+        txContext.executeModification(DELETE);
+        assertEquals(0, limiter.availablePermits());
+        txContext.executeModification(DELETE);
+        // Last acquire should have failed ...
+        assertEquals(0, limiter.availablePermits());
+
+        Future<Object> future = txContext.sendBatchedModifications();
+        assertEquals(0, limiter.availablePermits());
+
+        BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
+        // ... so we are sending 5 modifications ...
+        assertEquals(5, msg.getModifications().size());
+        assertEquals(1, msg.getTotalMessagesSent());
+        sendReply(new Failure(new NullPointerException()));
+
+        assertFuture(future, new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) {
+                assertTrue(failure instanceof NullPointerException);
+                // ... but they account for only 4 permits.
+                assertEquals(4, limiter.availablePermits());
+            }
+        });
+
+        kit.expectNoMsg();
+    }
+
+    private void sendReply(final Object message) {
+        final ActorRef askActor = kit.getLastSender();
+        kit.watch(askActor);
+        kit.reply(new Failure(new IllegalStateException()));
+        kit.expectTerminated(askActor);
+    }
+
+    private static void assertFuture(final Future<Object> future, final OnComplete<Object> complete)
+            throws TimeoutException, InterruptedException {
+        Await.ready(future, FiniteDuration.apply(3, TimeUnit.SECONDS));
+        future.onComplete(complete, ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()));
+    }
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.