Poisoning entries may involve reaction from their callbacks, which
can attempt to circle back through connections.
Make sure we poison them outside of lock context, so that any
callbacks end up seeing a poisoned connection, but without the lock
being held -- hence the locks can be acquired in-order.
JIRA: CONTROLLER-1893
Change-Id: I26551d052307812e76f3e45024a77dbb83312b17
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
e983d61d93fe2da50f9c4112fa28c7fe4ee5ffef)
import com.google.common.base.MoreObjects.ToStringHelper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collection;
import com.google.common.base.MoreObjects.ToStringHelper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
*/
@VisibleForTesting
final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
*/
@VisibleForTesting
final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
- final Optional<Long> delay;
-
+
+ final List<ConnectionEntry> poisonEntries;
+ final NoProgressException poisonCause;
try {
haveTimer = false;
final long now = currentTime();
try {
haveTimer = false;
final long now = currentTime();
// The following line is only reliable when queue is not forwarding, but such state should not last long.
// FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
final long ticksSinceProgress = queue.ticksStalling(now);
// The following line is only reliable when queue is not forwarding, but such state should not last long.
// FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
final long ticksSinceProgress = queue.ticksStalling(now);
- if (ticksSinceProgress >= context.config().getNoProgressTimeout()) {
- LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
- TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+ if (ticksSinceProgress < context.config().getNoProgressTimeout()) {
+ // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
+ // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual
+ // tri-state return convention.
+ final Optional<Long> delay = lockedCheckTimeout(now);
+ if (delay == null) {
+ // We have timed out. There is no point in scheduling a timer
+ LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
+ return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
+ new TimeoutException()));
+ }
- lockedPoison(new NoProgressException(ticksSinceProgress));
- current.removeConnection(this);
- return current;
- }
+ if (delay.isPresent()) {
+ // If there is new delay, schedule a timer
+ scheduleTimer(delay.get());
+ } else {
+ LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
+ }
- // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
- // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
- // return convention.
- delay = lockedCheckTimeout(now);
- if (delay == null) {
- // We have timed out. There is no point in scheduling a timer
- LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
- return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
- new TimeoutException()));
- if (delay.isPresent()) {
- // If there is new delay, schedule a timer
- scheduleTimer(delay.get());
- } else {
- LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
- }
+ LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+ TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+ poisonCause = new NoProgressException(ticksSinceProgress);
+ poisonEntries = lockedPoison(poisonCause);
+ current.removeConnection(this);
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
+ poison(poisonEntries, poisonCause);
}
final void poison(final RequestException cause) {
}
final void poison(final RequestException cause) {
+ final List<ConnectionEntry> entries;
+
+ entries = lockedPoison(cause);
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
+
+ poison(entries, cause);
+ }
+
+ // Do not hold any locks while calling this
+ private static void poison(final Collection<? extends ConnectionEntry> entries, final RequestException cause) {
+ for (ConnectionEntry e : entries) {
+ final Request<?, ?> request = e.getRequest();
+ LOG.trace("Poisoning request {}", request, cause);
+ e.complete(request.toRequestFailure(cause));
+ }
- private void lockedPoison(final RequestException cause) {
+ private List<ConnectionEntry> lockedPoison(final RequestException cause) {
poisoned = enrichPoison(cause);
poisoned = enrichPoison(cause);
}
RequestException enrichPoison(final RequestException ex) {
}
RequestException enrichPoison(final RequestException ex) {
import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import java.util.Optional;
import java.util.Queue;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
- final void poison(final RequestException cause) {
- poisonQueue(inflight, cause);
- poisonQueue(pending, cause);
+ final List<ConnectionEntry> poison() {
+ final List<ConnectionEntry> entries = new ArrayList<>(inflight.size() + pending.size());
+ entries.addAll(inflight);
+ inflight.clear();
+ entries.addAll(pending);
+ pending.clear();
+ return entries;
}
final void setForwarder(final ReconnectForwarder forwarder, final long now) {
}
final void setForwarder(final ReconnectForwarder forwarder, final long now) {
-
- private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
- for (ConnectionEntry e : queue) {
- final Request<?, ?> request = e.getRequest();
- LOG.trace("Poisoning request {}", request, cause);
- e.complete(request.toRequestFailure(cause));
- }
- queue.clear();
- }
package org.opendaylight.controller.cluster.access.client;
import static org.hamcrest.CoreMatchers.everyItem;
package org.opendaylight.controller.cluster.access.client;
import static org.hamcrest.CoreMatchers.everyItem;
-import static org.mockito.Matchers.any;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
import akka.actor.ActorSystem;
import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
import akka.actor.ActorSystem;
import java.util.Optional;
import java.util.function.Consumer;
import org.junit.After;
import java.util.Optional;
import java.util.function.Consumer;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
}
final Collection<ConnectionEntry> entries = queue.drain();
queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
}
final Collection<ConnectionEntry> entries = queue.drain();
- Assert.assertEquals(sentMessages, entries.size());
- Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+ assertEquals(sentMessages, entries.size());
+ assertThat(entries, everyItem(entryWithRequest(request)));
}
@Test
public void testTicksStalling() {
final long now = Ticker.systemTicker().read();
}
@Test
public void testTicksStalling() {
final long now = Ticker.systemTicker().read();
- Assert.assertEquals(0, queue.ticksStalling(now));
+ assertEquals(0, queue.ticksStalling(now));
final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
final Optional<TransmittedConnectionEntry> completed1 =
queue.complete(new SuccessEnvelope(success1, sessionId, txSequence, 1L), now);
final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
final Optional<TransmittedConnectionEntry> completed1 =
queue.complete(new SuccessEnvelope(success1, sessionId, txSequence, 1L), now);
- Assert.assertFalse(completed1.isPresent());
+ assertFalse(completed1.isPresent());
//different response sequence
final long differentResponseSequence = 1L;
final RequestSuccess<?, ?> success2 =
new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, differentResponseSequence);
final Optional<TransmittedConnectionEntry> completed2 =
queue.complete(new SuccessEnvelope(success2, sessionId, txSequence, 1L), now);
//different response sequence
final long differentResponseSequence = 1L;
final RequestSuccess<?, ?> success2 =
new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, differentResponseSequence);
final Optional<TransmittedConnectionEntry> completed2 =
queue.complete(new SuccessEnvelope(success2, sessionId, txSequence, 1L), now);
- Assert.assertFalse(completed2.isPresent());
+ assertFalse(completed2.isPresent());
//different tx sequence
final long differentTxSequence = 1L;
final RequestSuccess<?, ?> success3 =
new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
final Optional<TransmittedConnectionEntry> completed3 =
queue.complete(new SuccessEnvelope(success3, sessionId, differentTxSequence, 1L), now);
//different tx sequence
final long differentTxSequence = 1L;
final RequestSuccess<?, ?> success3 =
new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
final Optional<TransmittedConnectionEntry> completed3 =
queue.complete(new SuccessEnvelope(success3, sessionId, differentTxSequence, 1L), now);
- Assert.assertFalse(completed3.isPresent());
+ assertFalse(completed3.isPresent());
//different session id
final long differentSessionId = 1L;
final RequestSuccess<?, ?> success4 =
new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
final Optional<TransmittedConnectionEntry> completed4 =
queue.complete(new SuccessEnvelope(success4, differentSessionId, differentTxSequence, 1L), now);
//different session id
final long differentSessionId = 1L;
final RequestSuccess<?, ?> success4 =
new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
final Optional<TransmittedConnectionEntry> completed4 =
queue.complete(new SuccessEnvelope(success4, differentSessionId, differentTxSequence, 1L), now);
- Assert.assertFalse(completed4.isPresent());
+ assertFalse(completed4.isPresent());
}
@Test
public void testIsEmpty() {
}
@Test
public void testIsEmpty() {
- Assert.assertTrue(queue.isEmpty());
+ assertTrue(queue.isEmpty());
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
- Assert.assertFalse(queue.isEmpty());
+ assertFalse(queue.isEmpty());
final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
queue.enqueueOrForward(entry1, now);
queue.enqueueOrForward(entry2, now);
final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
queue.enqueueOrForward(entry1, now);
queue.enqueueOrForward(entry2, now);
- Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest());
+ assertEquals(entry1.getRequest(), queue.peek().getRequest());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
- queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
- verify(callback).accept(any(TransactionFailure.class));
- Assert.assertTrue(queue.isEmpty());
+ assertEquals(1, queue.poison().size());
}
@SuppressWarnings("unchecked")
}
@SuppressWarnings("unchecked")