From: Robert Varga Date: Thu, 1 Dec 2016 10:17:53 +0000 (+0100) Subject: BUG-5280: refactor AbstractClientConnection X-Git-Tag: release/carbon~370 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=b604e392408dfbf46ee0a7e1ac4cd27170a83bff BUG-5280: refactor AbstractClientConnection The structure of AbstractClientConnection and its subclasses makes it hard to replay messages in a coordinated fashion. Furthermore splitting the inflight and pending and inflight queues into separate classes means we would have to jump through quite a few hoops to correctly calculate backpressure. Refactor the base class so it includes all the operations usually performed, with remoteMaxMessages() acting as the limiter, which disables transmission in connecting/reconnecting states. Change-Id: If743e4913aade7ed65ba60375d8b7d12c563cb96 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 0e9382dbba..bade34cb2f 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -13,6 +13,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Verify; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Map.Entry; import java.util.Optional; import java.util.Queue; import java.util.concurrent.TimeUnit; @@ -21,6 +23,7 @@ import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; @@ -45,7 +48,9 @@ public abstract class AbstractClientConnection { @VisibleForTesting static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30); + private final Queue inflight; private final Queue pending; + private final ClientActorContext context; private final Long cookie; @@ -54,21 +59,22 @@ public abstract class AbstractClientConnection { private long lastProgress; private AbstractClientConnection(final ClientActorContext context, final Long cookie, - final Queue pending) { + final Queue inflight, final Queue pending) { this.context = Preconditions.checkNotNull(context); this.cookie = Preconditions.checkNotNull(cookie); + this.inflight = Preconditions.checkNotNull(inflight); this.pending = Preconditions.checkNotNull(pending); this.lastProgress = readTime(); } // Do not allow subclassing outside of this package AbstractClientConnection(final ClientActorContext context, final Long cookie) { - this(context, cookie, new ArrayDeque<>(1)); + this(context, cookie, new ArrayDeque<>(), new ArrayDeque<>(1)); } // Do not allow subclassing outside of this package AbstractClientConnection(final AbstractClientConnection oldConnection) { - this(oldConnection.context, oldConnection.cookie, oldConnection.pending); + this(oldConnection.context, oldConnection.cookie, oldConnection.inflight, oldConnection.pending); } public final ClientActorContext context() { @@ -83,14 +89,6 @@ public abstract class AbstractClientConnection { return context.self(); } - final long readTime() { - return context.ticker().read(); - } - - final Queue pending() { - return pending; - } - /** * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke * from any thread. @@ -128,22 +126,55 @@ public abstract class AbstractClientConnection { public abstract Optional getBackendInfo(); + abstract ClientActorBehavior reconnectConnection(ClientActorBehavior current); + + abstract int remoteMaxMessages(); + + abstract Entry prepareForTransmit(Request req); + @GuardedBy("this") - void spliceToSuccessor(final ReconnectForwarder successor) { - ConnectionEntry entry = pending.poll(); + final void spliceToSuccessor(final ReconnectForwarder successor) { + ConnectionEntry entry = inflight.poll(); + while (entry != null) { + successor.forwardEntry(entry); + entry = inflight.poll(); + } + + entry = pending.poll(); while (entry != null) { successor.forwardEntry(entry); entry = pending.poll(); } } - final ConnectionEntry dequeEntry() { - lastProgress = readTime(); - return pending.poll(); + private long readTime() { + return context.ticker().read(); + } + + private void transmit(final ConnectionEntry entry) { + final Entry tuple = prepareForTransmit(entry.getRequest()); + final RequestEnvelope req = tuple.getValue(); + + // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread + // than the client actor thread, in which case the round-trip could be made faster than we can enqueue -- + // in which case the receive routine would not find the entry. + final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, req.getSessionId(), + req.getTxSequence(), readTime()); + inflight.add(txEntry); + + final ActorRef actor = tuple.getKey(); + LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), req, actor); + actor.tell(req, ActorRef.noSender()); } - void enqueueEntry(final ConnectionEntry entry) { - pending.add(entry); + final void enqueueEntry(final ConnectionEntry entry) { + if (inflight.size() < remoteMaxMessages()) { + transmit(entry); + LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this); + } else { + LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest()); + pending.add(entry); + } } /** @@ -167,7 +198,7 @@ public abstract class AbstractClientConnection { final ClientActorBehavior runTimer(final ClientActorBehavior current) { final long now = readTime(); - if (!isEmpty()) { + if (!inflight.isEmpty() || !pending.isEmpty()) { final long ticksSinceProgress = now - lastProgress; if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, @@ -196,10 +227,6 @@ public abstract class AbstractClientConnection { return current; } - boolean isEmpty() { - return pending.isEmpty(); - } - /* * We are using tri-state return here to indicate one of three conditions: * - if there is no timeout to schedule, return Optional.empty() @@ -208,7 +235,7 @@ public abstract class AbstractClientConnection { */ @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", justification = "Returning null Optional is documented in the API contract.") - final Optional checkTimeout(final ConnectionEntry head, final long now) { + private Optional checkTimeout(final ConnectionEntry head, final long now) { if (head == null) { return Optional.empty(); } @@ -230,21 +257,30 @@ public abstract class AbstractClientConnection { */ @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", justification = "Returning null Optional is documented in the API contract.") - Optional checkTimeout(final long now) { - return checkTimeout(pending.peek(), now); - } - - static void poisonQueue(final Queue queue, final RequestException cause) { - for (ConnectionEntry e : queue) { - final Request request = e.getRequest(); - LOG.trace("Poisoning request {}", request, cause); - e.complete(request.toRequestFailure(cause)); + @VisibleForTesting + final Optional checkTimeout(final long now) { + final Optional xmit = checkTimeout(inflight.peek(), now); + if (xmit == null) { + return null; } - queue.clear(); + final Optional pend = checkTimeout(pending.peek(), now); + if (pend == null) { + return null; + } + if (!xmit.isPresent()) { + return pend; + } + if (!pend.isPresent()) { + return xmit; + } + + return Optional.of(xmit.get().min(pend.get())); } - void poison(final RequestException cause) { + final void poison(final RequestException cause) { poisoned = cause; + + poisonQueue(inflight, cause); poisonQueue(pending, cause); } @@ -253,7 +289,98 @@ public abstract class AbstractClientConnection { return poisoned; } - abstract ClientActorBehavior reconnectConnection(ClientActorBehavior current); + final void receiveResponse(final ResponseEnvelope envelope) { + Optional maybeEntry = findMatchingEntry(inflight, envelope); + if (maybeEntry == null) { + LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope); + maybeEntry = findMatchingEntry(pending, envelope); + } - abstract void receiveResponse(final ResponseEnvelope envelope); + if (maybeEntry == null || !maybeEntry.isPresent()) { + LOG.warn("No request matching {} found, ignoring response", envelope); + return; + } + + final TransmittedConnectionEntry entry = maybeEntry.get(); + LOG.debug("Completing {} with {}", entry, envelope); + entry.complete(envelope.getMessage()); + + // We have freed up a slot, try to transmit something + int toSend = remoteMaxMessages() - inflight.size(); + while (toSend > 0) { + final ConnectionEntry e = pending.poll(); + if (e == null) { + break; + } + + LOG.debug("Transmitting entry {}", e); + transmit(e); + toSend--; + } + + lastProgress = readTime(); + } + + private static void poisonQueue(final Queue queue, final RequestException cause) { + for (ConnectionEntry e : queue) { + final Request request = e.getRequest(); + LOG.trace("Poisoning request {}", request, cause); + e.complete(request.toRequestFailure(cause)); + } + queue.clear(); + } + + /* + * We are using tri-state return here to indicate one of three conditions: + * - if a matching entry is found, return an Optional containing it + * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null + * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional + */ + @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", + justification = "Returning null Optional is documented in the API contract.") + private static Optional findMatchingEntry(final Queue queue, + final ResponseEnvelope envelope) { + // Try to find the request in a queue. Responses may legally come back in a different order, hence we need + // to use an iterator + final Iterator it = queue.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); + final Request request = e.getRequest(); + final Response response = envelope.getMessage(); + + // First check for matching target, or move to next entry + if (!request.getTarget().equals(response.getTarget())) { + continue; + } + + // Sanity-check logical sequence, ignore any out-of-order messages + if (request.getSequence() != response.getSequence()) { + LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope); + return Optional.empty(); + } + + // Check if the entry has (ever) been transmitted + if (!(e instanceof TransmittedConnectionEntry)) { + return Optional.empty(); + } + + final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e; + + // Now check session match + if (envelope.getSessionId() != te.getSessionId()) { + LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope); + return Optional.empty(); + } + if (envelope.getTxSequence() != te.getTxSequence()) { + LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope); + return Optional.empty(); + } + + LOG.debug("Completing request {} with {}", request, envelope); + it.remove(); + return Optional.of(te); + } + + return null; + } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java index 85ca5fee65..8646bfcba5 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java @@ -7,22 +7,8 @@ */ package org.opendaylight.controller.cluster.access.client; -import akka.actor.ActorRef; import com.google.common.base.Preconditions; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayDeque; -import java.util.Iterator; import java.util.Optional; -import java.util.Queue; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.access.ABIVersion; -import org.opendaylight.controller.cluster.access.concepts.Request; -import org.opendaylight.controller.cluster.access.concepts.RequestException; -import org.opendaylight.controller.cluster.access.concepts.Response; -import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; /** * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its @@ -33,23 +19,18 @@ import scala.concurrent.duration.FiniteDuration; * @param Concrete {@link BackendInfo} type */ abstract class AbstractReceivingClientConnection extends AbstractClientConnection { - private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class); - - private final Queue inflight = new ArrayDeque<>(); private final T backend; - - private long lastProgress; + private long nextTxSequence; AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) { super(context, cookie); this.backend = Preconditions.checkNotNull(backend); - this.lastProgress = readTime(); } AbstractReceivingClientConnection(final AbstractReceivingClientConnection oldConnection) { super(oldConnection); this.backend = oldConnection.backend; - this.lastProgress = oldConnection.lastProgress; + this.nextTxSequence = oldConnection.nextTxSequence; } @Override @@ -57,160 +38,11 @@ abstract class AbstractReceivingClientConnection extends return Optional.of(backend); } - final ActorRef remoteActor() { - return backend.getActor(); - } - - final int remoteMaxMessages() { - return backend.getMaxMessages(); - } - - final ABIVersion remoteVersion() { - return backend.getVersion(); + final T backend() { + return backend; } - final long sessionId() { - return backend.getSessionId(); - } - - final int inflightSize() { - return inflight.size(); - } - - final void appendToInflight(final TransmittedConnectionEntry entry) { - // This should never fail - inflight.add(entry); - } - - @GuardedBy("this") - @Override - void spliceToSuccessor(final ReconnectForwarder successor) { - ConnectionEntry entry = inflight.poll(); - while (entry != null) { - successor.forwardEntry(entry); - entry = inflight.poll(); - } - - super.spliceToSuccessor(successor); - } - - @Override - void receiveResponse(final ResponseEnvelope envelope) { - Optional maybeEntry = findMatchingEntry(inflight, envelope); - if (maybeEntry == null) { - LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope); - maybeEntry = findMatchingEntry(pending(), envelope); - } - - if (maybeEntry == null || !maybeEntry.isPresent()) { - LOG.warn("No request matching {} found, ignoring response", envelope); - return; - } - - lastProgress = readTime(); - - final TransmittedConnectionEntry entry = maybeEntry.get(); - LOG.debug("Completing {} with {}", entry, envelope); - entry.complete(envelope.getMessage()); - - // We have freed up a slot, try to transmit something - final int toSend = remoteMaxMessages() - inflight.size(); - if (toSend > 0) { - sendMessages(toSend); - } - } - - @Override - boolean isEmpty() { - return inflight.isEmpty() && super.isEmpty(); - } - - @Override - void poison(final RequestException cause) { - super.poison(cause); - poisonQueue(inflight, cause); - } - - /** - * Transmit a given number of messages. - * - * @param count Number of messages to transmit, guaranteed to be positive. - */ - abstract void sendMessages(int count); - - /* - * We are using tri-state return here to indicate one of three conditions: - * - if a matching entry is found, return an Optional containing it - * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null - * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional - */ - @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", - justification = "Returning null Optional is documented in the API contract.") - private static Optional findMatchingEntry(final Queue queue, - final ResponseEnvelope envelope) { - // Try to find the request in a queue. Responses may legally come back in a different order, hence we need - // to use an iterator - final Iterator it = queue.iterator(); - while (it.hasNext()) { - final ConnectionEntry e = it.next(); - final Request request = e.getRequest(); - final Response response = envelope.getMessage(); - - // First check for matching target, or move to next entry - if (!request.getTarget().equals(response.getTarget())) { - continue; - } - - // Sanity-check logical sequence, ignore any out-of-order messages - if (request.getSequence() != response.getSequence()) { - LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope); - return Optional.empty(); - } - - // Check if the entry has (ever) been transmitted - if (!(e instanceof TransmittedConnectionEntry)) { - return Optional.empty(); - } - - final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e; - - // Now check session match - if (envelope.getSessionId() != te.getSessionId()) { - LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope); - return Optional.empty(); - } - if (envelope.getTxSequence() != te.getTxSequence()) { - LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope); - return Optional.empty(); - } - - LOG.debug("Completing request {} with {}", request, envelope); - it.remove(); - return Optional.of(te); - } - - return null; - } - - @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", - justification = "Returning null Optional is documented in the API contract.") - @Override - final Optional checkTimeout(final long now) { - final Optional xmit = checkTimeout(inflight.peek(), now); - if (xmit == null) { - return null; - } - final Optional pend = super.checkTimeout(now); - if (pend == null) { - return null; - } - if (!xmit.isPresent()) { - return pend; - } - if (!pend.isPresent()) { - return xmit; - } - - return Optional.of(xmit.get().min(pend.get())); + final long nextTxSequence() { + return nextTxSequence++; } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java index eab11429b8..9dad825e7e 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java @@ -9,72 +9,35 @@ package org.opendaylight.controller.cluster.access.client; import akka.actor.ActorRef; import com.google.common.annotations.Beta; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.Map.Entry; import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Beta @NotThreadSafe public final class ConnectedClientConnection extends AbstractReceivingClientConnection { - private static final Logger LOG = LoggerFactory.getLogger(ConnectedClientConnection.class); - - private long nextTxSequence; - public ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) { super(context, cookie, backend); } - private void transmit(final ConnectionEntry entry) { - final long txSequence = nextTxSequence++; - - final RequestEnvelope toSend = new RequestEnvelope(entry.getRequest().toVersion(remoteVersion()), sessionId(), - txSequence); - - // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread - // than the client actor thread, in which case the round-trip could be made faster than we can enqueue -- - // in which case the receive routine would not find the entry. - final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, sessionId(), txSequence, - readTime()); - appendToInflight(txEntry); - - final ActorRef actor = remoteActor(); - LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), toSend, actor); - actor.tell(toSend, ActorRef.noSender()); - } - @Override - void enqueueEntry(final ConnectionEntry entry) { - if (inflightSize() < remoteMaxMessages()) { - transmit(entry); - LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this); - } else { - LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest()); - super.enqueueEntry(entry); - } + ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { + final ReconnectingClientConnection next = new ReconnectingClientConnection<>(this); + setForwarder(new SimpleReconnectForwarder(next)); + current.reconnectConnection(this, next); + return current; } @Override - void sendMessages(final int count) { - int toSend = count; - - while (toSend > 0) { - final ConnectionEntry e = dequeEntry(); - if (e == null) { - break; - } - - LOG.debug("Transmitting entry {}", e); - transmit(e); - toSend--; - } + int remoteMaxMessages() { + return backend().getMaxMessages(); } @Override - ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { - final ReconnectingClientConnection next = new ReconnectingClientConnection<>(this); - setForwarder(new SimpleReconnectForwarder(next)); - current.reconnectConnection(this, next); - return current; + Entry prepareForTransmit(final Request req) { + return new SimpleImmutableEntry<>(backend().getActor(), new RequestEnvelope( + req.toVersion(backend().getVersion()), backend().getSessionId(), nextTxSequence())); } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java index cdadf1d601..e28f9b35ed 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnection.java @@ -7,16 +7,15 @@ */ package org.opendaylight.controller.cluster.access.client; +import akka.actor.ActorRef; import com.google.common.annotations.Beta; +import java.util.Map.Entry; import java.util.Optional; -import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; @Beta public final class ConnectingClientConnection extends AbstractClientConnection { - private static final Logger LOG = LoggerFactory.getLogger(ConnectingClientConnection.class); - // Initial state, never instantiated externally ConnectingClientConnection(final ClientActorContext context, final Long cookie) { super(context, cookie); @@ -28,12 +27,18 @@ public final class ConnectingClientConnection extends Abs } @Override - void receiveResponse(final ResponseEnvelope envelope) { - LOG.warn("Initial connection {} ignoring response {}", this, envelope); + ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { + throw new UnsupportedOperationException("Attempted to reconnect a connecting connection"); } @Override - ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { - throw new UnsupportedOperationException("Attempted to reconnect a connecting connection"); + Entry prepareForTransmit(final Request req) { + // This is guarded by remoteMaxMessages() == 0 + throw new UnsupportedOperationException("Attempted to transmit on a connecting connection"); + } + + @Override + int remoteMaxMessages() { + return 0; } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java index 0209f95bce..e15a949600 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnection.java @@ -7,6 +7,10 @@ */ package org.opendaylight.controller.cluster.access.client; +import akka.actor.ActorRef; +import java.util.Map.Entry; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,15 +28,21 @@ public final class ReconnectingClientConnection extends A super(oldConnection); } - @Override - void sendMessages(final int count) { - LOG.debug("Connection {} is reconnecting, not transmitting anything", this); - } - @Override ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { // Intentional no-op LOG.debug("Skipping reconnect of already-reconnecting connection {}", this); return current; } + + @Override + Entry prepareForTransmit(final Request req) { + // This is guarded by remoteMaxMessages() == 0 + throw new UnsupportedOperationException("Attempted to transmit on a reconnecting connection"); + } + + @Override + int remoteMaxMessages() { + return 0; + } }