X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FConnectedClientConnection.java;h=8a5af45d155162a1e03e94448bfd5a6964604de3;hb=3859df9beca8f13f1ff2b2744ed3470a1715bec3;hp=eab11429b8a9e3403ac92adbdb1e1f737865e89f;hpb=db9a673c114febc785fbd324947ac2c3e3095d06;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java index eab11429b8..8a5af45d15 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java @@ -7,72 +7,19 @@ */ package org.opendaylight.controller.cluster.access.client; -import akka.actor.ActorRef; import com.google.common.annotations.Beta; -import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.controller.cluster.access.concepts.RequestException; @Beta -@NotThreadSafe public final class ConnectedClientConnection extends AbstractReceivingClientConnection { - private static final Logger LOG = LoggerFactory.getLogger(ConnectedClientConnection.class); - private long nextTxSequence; - - public ConnectedClientConnection(final ClientActorContext context, final Long cookie, final T backend) { - super(context, cookie, backend); - } - - private void transmit(final ConnectionEntry entry) { - final long txSequence = nextTxSequence++; - - final RequestEnvelope toSend = new RequestEnvelope(entry.getRequest().toVersion(remoteVersion()), sessionId(), - txSequence); - - // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread - // than the client actor thread, in which case the round-trip could be made faster than we can enqueue -- - // in which case the receive routine would not find the entry. - final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, sessionId(), txSequence, - readTime()); - appendToInflight(txEntry); - - final ActorRef actor = remoteActor(); - LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), toSend, actor); - actor.tell(toSend, ActorRef.noSender()); - } - - @Override - void enqueueEntry(final ConnectionEntry entry) { - if (inflightSize() < remoteMaxMessages()) { - transmit(entry); - LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this); - } else { - LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest()); - super.enqueueEntry(entry); - } - } - - @Override - void sendMessages(final int count) { - int toSend = count; - - while (toSend > 0) { - final ConnectionEntry e = dequeEntry(); - if (e == null) { - break; - } - - LOG.debug("Transmitting entry {}", e); - transmit(e); - toSend--; - } + ConnectedClientConnection(final AbstractClientConnection oldConnection, final T newBackend) { + super(oldConnection, newBackend); } @Override - ClientActorBehavior reconnectConnection(final ClientActorBehavior current) { - final ReconnectingClientConnection next = new ReconnectingClientConnection<>(this); + ClientActorBehavior lockedReconnect(final ClientActorBehavior current, final RequestException cause) { + final ReconnectingClientConnection next = new ReconnectingClientConnection<>(this, cause); setForwarder(new SimpleReconnectForwarder(next)); current.reconnectConnection(this, next); return current;