X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FConnectedClientConnection.java;h=9198d8fe12c66980802917085bc4c548b8b5e484;hb=d2f03dcaedbf68a62d524fb066a1044007714c76;hp=6c1507c50d7f89fc11da21a7ecb55b4ec4b5a0c9;hpb=320a4e5cd2d9d80468a3f82798744f2035488218;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java index 6c1507c50d..9198d8fe12 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java @@ -7,64 +7,16 @@ */ package org.opendaylight.controller.cluster.access.client; -import akka.actor.ActorRef; import com.google.common.annotations.Beta; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Beta @NotThreadSafe public final class ConnectedClientConnection extends AbstractReceivingClientConnection { - private static final Logger LOG = LoggerFactory.getLogger(ConnectedClientConnection.class); - - private long nextTxSequence; - - public ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) { + ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) { super(context, cookie, backend); } - private TransmittedConnectionEntry transmit(final ConnectionEntry entry) { - final long txSequence = nextTxSequence++; - - final RequestEnvelope toSend = new RequestEnvelope(entry.getRequest().toVersion(remoteVersion()), sessionId(), - txSequence); - - final ActorRef actor = remoteActor(); - LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), toSend, actor); - actor.tell(toSend, ActorRef.noSender()); - - return new TransmittedConnectionEntry(entry, sessionId(), txSequence, readTime()); - } - - @Override - void enqueueEntry(final ConnectionEntry entry) { - if (inflightSize() < remoteMaxMessages()) { - appendToInflight(transmit(entry)); - LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this); - } else { - LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest()); - super.enqueueEntry(entry); - } - } - - @Override - void sendMessages(final int count) { - int toSend = count; - - while (toSend > 0) { - final ConnectionEntry e = dequeEntry(); - if (e == null) { - break; - } - - LOG.debug("Transmitting entry {}", e); - appendToInflight(transmit(e)); - toSend--; - } - } - @Override ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { final ReconnectingClientConnection next = new ReconnectingClientConnection<>(this);