2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13 import java.util.ArrayDeque;
14 import java.util.Iterator;
15 import java.util.Optional;
16 import java.util.Queue;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.access.ABIVersion;
19 import org.opendaylight.controller.cluster.access.concepts.Request;
20 import org.opendaylight.controller.cluster.access.concepts.RequestException;
21 import org.opendaylight.controller.cluster.access.concepts.Response;
22 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.duration.FiniteDuration;
28 * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
29 * sublcasses. It allows us to share some code.
31 * @author Robert Varga
33 * @param <T> Concrete {@link BackendInfo} type
35 abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
36 private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class);
38 private final Queue<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
39 private final T backend;
41 private long lastProgress;
43 AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
44 super(context, cookie);
45 this.backend = Preconditions.checkNotNull(backend);
46 this.lastProgress = readTime();
49 AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
51 this.backend = oldConnection.backend;
52 this.lastProgress = oldConnection.lastProgress;
56 public final Optional<T> getBackendInfo() {
57 return Optional.of(backend);
60 final ActorRef remoteActor() {
61 return backend.getActor();
64 final int remoteMaxMessages() {
65 return backend.getMaxMessages();
68 final ABIVersion remoteVersion() {
69 return backend.getVersion();
72 final long sessionId() {
73 return backend.getSessionId();
76 final int inflightSize() {
77 return inflight.size();
80 final void appendToInflight(final TransmittedConnectionEntry entry) {
81 // This should never fail
87 void spliceToSuccessor(final ReconnectForwarder successor) {
88 ConnectionEntry entry = inflight.poll();
89 while (entry != null) {
90 successor.forwardEntry(entry);
91 entry = inflight.poll();
94 super.spliceToSuccessor(successor);
98 void receiveResponse(final ResponseEnvelope<?> envelope) {
99 Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
100 if (maybeEntry == null) {
101 LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
102 maybeEntry = findMatchingEntry(pending(), envelope);
105 if (maybeEntry == null || !maybeEntry.isPresent()) {
106 LOG.warn("No request matching {} found, ignoring response", envelope);
110 lastProgress = readTime();
112 final TransmittedConnectionEntry entry = maybeEntry.get();
113 LOG.debug("Completing {} with {}", entry, envelope);
114 entry.complete(envelope.getMessage());
116 // We have freed up a slot, try to transmit something
117 final int toSend = remoteMaxMessages() - inflight.size();
119 sendMessages(toSend);
125 return inflight.isEmpty() && super.isEmpty();
129 void poison(final RequestException cause) {
131 poisonQueue(inflight, cause);
135 * Transmit a given number of messages.
137 * @param count Number of messages to transmit, guaranteed to be positive.
139 abstract void sendMessages(int count);
142 * We are using tri-state return here to indicate one of three conditions:
143 * - if a matching entry is found, return an Optional containing it
144 * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
145 * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
147 @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
148 justification = "Returning null Optional is documented in the API contract.")
149 private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
150 final ResponseEnvelope<?> envelope) {
151 // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
152 // to use an iterator
153 final Iterator<? extends ConnectionEntry> it = queue.iterator();
154 while (it.hasNext()) {
155 final ConnectionEntry e = it.next();
156 final Request<?, ?> request = e.getRequest();
157 final Response<?, ?> response = envelope.getMessage();
159 // First check for matching target, or move to next entry
160 if (!request.getTarget().equals(response.getTarget())) {
164 // Sanity-check logical sequence, ignore any out-of-order messages
165 if (request.getSequence() != response.getSequence()) {
166 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
167 return Optional.empty();
170 // Check if the entry has (ever) been transmitted
171 if (!(e instanceof TransmittedConnectionEntry)) {
172 return Optional.empty();
175 final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
177 // Now check session match
178 if (envelope.getSessionId() != te.getSessionId()) {
179 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
180 return Optional.empty();
182 if (envelope.getTxSequence() != te.getTxSequence()) {
183 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
184 return Optional.empty();
187 LOG.debug("Completing request {} with {}", request, envelope);
189 return Optional.of(te);
195 @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
196 justification = "Returning null Optional is documented in the API contract.")
198 final Optional<FiniteDuration> checkTimeout(final long now) {
199 final Optional<FiniteDuration> xmit = checkTimeout(inflight.peek(), now);
203 final Optional<FiniteDuration> pend = super.checkTimeout(now);
207 if (!xmit.isPresent()) {
210 if (!pend.isPresent()) {
214 return Optional.of(xmit.get().min(pend.get()));