X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FConnectedClientConnection.java;h=0afc7acf4945996e61e5996060a3b32424eba177;hp=eab11429b8a9e3403ac92adbdb1e1f737865e89f;hb=417f3a8c2401e6c8ce3c91ea969da24929c24c1c;hpb=db9a673c114febc785fbd324947ac2c3e3095d06 diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java index eab11429b8..0afc7acf49 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java @@ -7,72 +7,20 @@ */ package org.opendaylight.controller.cluster.access.client; -import akka.actor.ActorRef; import com.google.common.annotations.Beta; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.controller.cluster.access.concepts.RequestException; @Beta @NotThreadSafe public final class ConnectedClientConnection extends AbstractReceivingClientConnection { - private static final Logger LOG = LoggerFactory.getLogger(ConnectedClientConnection.class); - - private long nextTxSequence; - - public ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) { + ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) { super(context, cookie, backend); } - private void transmit(final ConnectionEntry entry) { - final long txSequence = nextTxSequence++; - - final RequestEnvelope toSend = new RequestEnvelope(entry.getRequest().toVersion(remoteVersion()), sessionId(), - txSequence); - - // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread - // than the client actor thread, in which case the round-trip could be made faster than we can enqueue -- - // in which case the receive routine would not find the entry. - final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, sessionId(), txSequence, - readTime()); - appendToInflight(txEntry); - - final ActorRef actor = remoteActor(); - LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), toSend, actor); - actor.tell(toSend, ActorRef.noSender()); - } - - @Override - void enqueueEntry(final ConnectionEntry entry) { - if (inflightSize() < remoteMaxMessages()) { - transmit(entry); - LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this); - } else { - LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest()); - super.enqueueEntry(entry); - } - } - - @Override - void sendMessages(final int count) { - int toSend = count; - - while (toSend > 0) { - final ConnectionEntry e = dequeEntry(); - if (e == null) { - break; - } - - LOG.debug("Transmitting entry {}", e); - transmit(e); - toSend--; - } - } - @Override - ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { - final ReconnectingClientConnection next = new ReconnectingClientConnection<>(this); + ClientActorBehavior lockedReconnect(final ClientActorBehavior current, final RequestException cause) { + final ReconnectingClientConnection next = new ReconnectingClientConnection<>(this, cause); setForwarder(new SimpleReconnectForwarder(next)); current.reconnectConnection(this, next); return current;