From e983d61d93fe2da50f9c4112fa28c7fe4ee5ffef Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 May 2019 19:42:26 +0200 Subject: [PATCH] Poison entries outside of main lock Poisoning entries may involve reaction from their callbacks, which can attempt to circle back through connections. Make sure we poison them outside of lock context, so that any callbacks end up seeing a poisoned connection, but without the lock being held -- hence the locks can be acquired in-order. JIRA: CONTROLLER-1893 Change-Id: I26551d052307812e76f3e45024a77dbb83312b17 Signed-off-by: Robert Varga --- .../client/AbstractClientConnection.java | 71 ++++++++++++------- .../cluster/access/client/TransmitQueue.java | 22 +++--- .../client/AbstractTransmitQueueTest.java | 33 ++++----- 3 files changed, 68 insertions(+), 58 deletions(-) diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 361027af1d..0d45dd5c69 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -15,6 +15,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -316,9 +317,10 @@ public abstract class AbstractClientConnection { */ @VisibleForTesting final ClientActorBehavior runTimer(final ClientActorBehavior current) { - final Optional delay; - lock.lock(); + + final List poisonEntries; + final NoProgressException poisonCause; try { haveTimer = false; final long now = currentTime(); @@ -328,36 +330,38 @@ public abstract class AbstractClientConnection { // The following line is only reliable when queue is not forwarding, but such state should not last long. // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries final long ticksSinceProgress = queue.ticksStalling(now); - if (ticksSinceProgress >= context.config().getNoProgressTimeout()) { - LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, - TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress)); + if (ticksSinceProgress < context.config().getNoProgressTimeout()) { + // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward. + // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual + // tri-state return convention. + final Optional delay = lockedCheckTimeout(now); + if (delay == null) { + // We have timed out. There is no point in scheduling a timer + LOG.debug("{}: connection {} timed out", context.persistenceId(), this); + return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out", + new TimeoutException())); + } - lockedPoison(new NoProgressException(ticksSinceProgress)); - current.removeConnection(this); - return current; - } + if (delay.isPresent()) { + // If there is new delay, schedule a timer + scheduleTimer(delay.get()); + } else { + LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this); + } - // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward. - // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state - // return convention. - delay = lockedCheckTimeout(now); - if (delay == null) { - // We have timed out. There is no point in scheduling a timer - LOG.debug("{}: connection {} timed out", context.persistenceId(), this); - return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out", - new TimeoutException())); + return current; } - if (delay.isPresent()) { - // If there is new delay, schedule a timer - scheduleTimer(delay.get()); - } else { - LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this); - } + LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, + TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress)); + poisonCause = new NoProgressException(ticksSinceProgress); + poisonEntries = lockedPoison(poisonCause); + current.removeConnection(this); } finally { lock.unlock(); } + poison(poisonEntries, poisonCause); return current; } @@ -435,18 +439,31 @@ public abstract class AbstractClientConnection { } final void poison(final RequestException cause) { + final List entries; + lock.lock(); try { - lockedPoison(cause); + entries = lockedPoison(cause); } finally { lock.unlock(); } + + poison(entries, cause); + } + + // Do not hold any locks while calling this + private static void poison(final Collection entries, final RequestException cause) { + for (ConnectionEntry e : entries) { + final Request request = e.getRequest(); + LOG.trace("Poisoning request {}", request, cause); + e.complete(request.toRequestFailure(cause)); + } } @Holding("lock") - private void lockedPoison(final RequestException cause) { + private List lockedPoison(final RequestException cause) { poisoned = enrichPoison(cause); - queue.poison(cause); + return queue.poison(); } RequestException enrichPoison(final RequestException ex) { diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index be14d059ce..e4ee78c539 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -13,14 +13,15 @@ import com.google.common.base.Preconditions; import com.google.common.base.Verify; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.Deque; import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.Queue; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; -import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; @@ -336,9 +337,13 @@ abstract class TransmitQueue { return pending.peek(); } - final void poison(final RequestException cause) { - poisonQueue(inflight, cause); - poisonQueue(pending, cause); + final List poison() { + final List entries = new ArrayList<>(inflight.size() + pending.size()); + entries.addAll(inflight); + inflight.clear(); + entries.addAll(pending); + pending.clear(); + return entries; } final void setForwarder(final ReconnectForwarder forwarder, final long now) { @@ -442,13 +447,4 @@ abstract class TransmitQueue { return null; } - - private static void poisonQueue(final Queue queue, final RequestException cause) { - for (ConnectionEntry e : queue) { - final Request request = e.getRequest(); - LOG.trace("Poisoning request {}", request, cause); - e.complete(request.toRequestFailure(cause)); - } - queue.clear(); - } } diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java index 69788e5da1..7bebd7ac74 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java @@ -8,9 +8,11 @@ package org.opendaylight.controller.cluster.access.client; import static org.hamcrest.CoreMatchers.everyItem; -import static org.mockito.Matchers.any; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest; import akka.actor.ActorSystem; @@ -21,10 +23,8 @@ import java.util.Collection; import java.util.Optional; import java.util.function.Consumer; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.opendaylight.controller.cluster.access.commands.TransactionFailure; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; @@ -35,7 +35,6 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.Response; -import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -82,14 +81,14 @@ public abstract class AbstractTransmitQueueTest { queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); } final Collection entries = queue.drain(); - Assert.assertEquals(sentMessages, entries.size()); - Assert.assertThat(entries, everyItem(entryWithRequest(request))); + assertEquals(sentMessages, entries.size()); + assertThat(entries, everyItem(entryWithRequest(request))); } @Test public void testTicksStalling() { final long now = Ticker.systemTicker().read(); - Assert.assertEquals(0, queue.ticksStalling(now)); + assertEquals(0, queue.ticksStalling(now)); } @Test @@ -106,38 +105,38 @@ public abstract class AbstractTransmitQueueTest { final RequestSuccess success1 = new TransactionPurgeResponse(anotherTxId, requestSequence); final Optional completed1 = queue.complete(new SuccessEnvelope(success1, sessionId, txSequence, 1L), now); - Assert.assertFalse(completed1.isPresent()); + assertFalse(completed1.isPresent()); //different response sequence final long differentResponseSequence = 1L; final RequestSuccess success2 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, differentResponseSequence); final Optional completed2 = queue.complete(new SuccessEnvelope(success2, sessionId, txSequence, 1L), now); - Assert.assertFalse(completed2.isPresent()); + assertFalse(completed2.isPresent()); //different tx sequence final long differentTxSequence = 1L; final RequestSuccess success3 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence); final Optional completed3 = queue.complete(new SuccessEnvelope(success3, sessionId, differentTxSequence, 1L), now); - Assert.assertFalse(completed3.isPresent()); + assertFalse(completed3.isPresent()); //different session id final long differentSessionId = 1L; final RequestSuccess success4 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence); final Optional completed4 = queue.complete(new SuccessEnvelope(success4, differentSessionId, differentTxSequence, 1L), now); - Assert.assertFalse(completed4.isPresent()); + assertFalse(completed4.isPresent()); } @Test public void testIsEmpty() { - Assert.assertTrue(queue.isEmpty()); + assertTrue(queue.isEmpty()); final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); - Assert.assertFalse(queue.isEmpty()); + assertFalse(queue.isEmpty()); } @Test @@ -150,7 +149,7 @@ public abstract class AbstractTransmitQueueTest { final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now); queue.enqueueOrForward(entry1, now); queue.enqueueOrForward(entry2, now); - Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest()); + assertEquals(entry1.getRequest(), queue.peek().getRequest()); } @Test @@ -159,9 +158,7 @@ public abstract class AbstractTransmitQueueTest { final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); - queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail"))); - verify(callback).accept(any(TransactionFailure.class)); - Assert.assertTrue(queue.isEmpty()); + assertEquals(1, queue.poison().size()); } @SuppressWarnings("unchecked") -- 2.36.6