X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FAbstractClientConnection.java;h=c32f7b2532cbe2d259357f5219b15835f941096a;hb=refs%2Fchanges%2F10%2F78310%2F5;hp=1fa67632eab593e400511433ad2ef900798df726;hpb=dafc95d149bc62f101de37e94b9b5e3526d4e87b;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 1fa67632ea..c32f7b2532 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -78,6 +78,7 @@ public abstract class AbstractClientConnection { @GuardedBy("lock") private final TransmitQueue queue; private final Long cookie; + private final String backendName; @GuardedBy("lock") private boolean haveTimer; @@ -90,9 +91,11 @@ public abstract class AbstractClientConnection { private volatile RequestException poisoned; // Private constructor to avoid code duplication. - private AbstractClientConnection(final AbstractClientConnection oldConn, final TransmitQueue newQueue) { + private AbstractClientConnection(final AbstractClientConnection oldConn, final TransmitQueue newQueue, + final String backendName) { this.context = Preconditions.checkNotNull(oldConn.context); this.cookie = Preconditions.checkNotNull(oldConn.cookie); + this.backendName = Preconditions.checkNotNull(backendName); this.queue = Preconditions.checkNotNull(newQueue); // Will be updated in finishReplay if needed. this.lastReceivedTicks = oldConn.lastReceivedTicks; @@ -100,9 +103,11 @@ public abstract class AbstractClientConnection { // This constructor is only to be called by ConnectingClientConnection constructor. // Do not allow subclassing outside of this package - AbstractClientConnection(final ClientActorContext context, final Long cookie, final int queueDepth) { + AbstractClientConnection(final ClientActorContext context, final Long cookie, final String backendName, + final int queueDepth) { this.context = Preconditions.checkNotNull(context); this.cookie = Preconditions.checkNotNull(cookie); + this.backendName = Preconditions.checkNotNull(backendName); this.queue = new TransmitQueue.Halted(queueDepth); this.lastReceivedTicks = currentTime(); } @@ -110,13 +115,15 @@ public abstract class AbstractClientConnection { // This constructor is only to be called (indirectly) by ReconnectingClientConnection constructor. // Do not allow subclassing outside of this package AbstractClientConnection(final AbstractClientConnection oldConn) { - this(oldConn, new TransmitQueue.Halted(oldConn.queue, oldConn.currentTime())); + this(oldConn, new TransmitQueue.Halted(oldConn.queue, oldConn.currentTime()), oldConn.backendName); } // This constructor is only to be called (indirectly) by ConnectedClientConnection constructor. // Do not allow subclassing outside of this package - AbstractClientConnection(final AbstractClientConnection oldConn, final T newBackend, final int queueDepth) { - this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime())); + AbstractClientConnection(final AbstractClientConnection oldConn, final T newBackend, + final int queueDepth) { + this(oldConn, new TransmitQueue.Transmitting(oldConn.queue, queueDepth, newBackend, oldConn.currentTime(), + Preconditions.checkNotNull(oldConn.context).messageSlicer()), newBackend.getName()); } public final ClientActorContext context() { @@ -167,24 +174,42 @@ public abstract class AbstractClientConnection { enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime()); } - public final long enqueueEntry(final ConnectionEntry entry, final long now) { + private long enqueueOrForward(final ConnectionEntry entry, final long now) { lock.lock(); try { - final RequestException maybePoison = poisoned; - if (maybePoison != null) { - throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison); - } + commonEnqueue(entry, now); + return queue.enqueueOrForward(entry, now); + } finally { + lock.unlock(); + } + } - if (queue.isEmpty()) { - // The queue is becoming non-empty, schedule a timer. - scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now); - } - return queue.enqueue(entry, now); + /** + * Enqueue an entry, possibly also transmitting it. + */ + public final void enqueueEntry(final ConnectionEntry entry, final long now) { + lock.lock(); + try { + commonEnqueue(entry, now); + queue.enqueueOrReplay(entry, now); } finally { lock.unlock(); } } + @GuardedBy("lock") + private void commonEnqueue(final ConnectionEntry entry, final long now) { + final RequestException maybePoison = poisoned; + if (maybePoison != null) { + throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison); + } + + if (queue.isEmpty()) { + // The queue is becoming non-empty, schedule a timer. + scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now); + } + } + // To be called from ClientActorBehavior on ConnectedClientConnection after entries are replayed. final void cancelDebt() { queue.cancelDebt(currentTime()); @@ -227,7 +252,7 @@ public abstract class AbstractClientConnection { RequestException runtimeRequestException); final void sendEntry(final ConnectionEntry entry, final long now) { - long delay = enqueueEntry(entry, now); + long delay = enqueueOrForward(entry, now); try { if (delay >= DEBUG_DELAY_NANOS) { if (delay > MAX_DELAY_NANOS) { @@ -243,7 +268,7 @@ public abstract class AbstractClientConnection { TimeUnit.NANOSECONDS.sleep(delay); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now); + LOG.debug("Interrupted after sleeping {}ns", currentTime() - now, e); } } @@ -384,9 +409,7 @@ public abstract class AbstractClientConnection { queue.remove(now); LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head); - final double time = beenOpen * 1.0 / 1_000_000_000; - head.complete(head.getRequest().toRequestFailure( - new RequestTimeoutException("Timed out after " + time + "seconds"))); + timeoutEntry(head, beenOpen); } LOG.debug("Connection {} timed out {} tasks", this, tasksTimedOut); @@ -397,6 +420,20 @@ public abstract class AbstractClientConnection { return Optional.empty(); } + private void timeoutEntry(final ConnectionEntry entry, final long beenOpen) { + // Timeouts needs to be re-scheduled on actor thread because we are holding the lock on the current queue, + // which may be the tail of a successor chain. This is a problem if the callback attempts to send a request + // because that will attempt to lock the chain from the start, potentially causing a deadlock if there is + // a concurrent attempt to transmit. + context.executeInActor(current -> { + final double time = beenOpen * 1.0 / 1_000_000_000; + entry.complete(entry.getRequest().toRequestFailure( + new RequestTimeoutException(entry.getRequest() + " timed out after " + time + + " seconds. The backend for " + backendName + " is not available."))); + return current; + }); + } + final void poison(final RequestException cause) { lock.lock(); try {