9ab80d0d0085df1ef612e48606079b4eae413ea0
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterables;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Iterator;
17 import java.util.Optional;
18 import java.util.Queue;
19 import javax.annotation.concurrent.NotThreadSafe;
20 import org.opendaylight.controller.cluster.access.concepts.Request;
21 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
22 import org.opendaylight.controller.cluster.access.concepts.RequestException;
23 import org.opendaylight.controller.cluster.access.concepts.Response;
24 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
30  * operations.
31  *
32  * <p>
33  * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
34  * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
35  * but can involve spurious removals of non-head entries.
36  *
37  * <p>
38  * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
39  * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
40  * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
41  *
42  * <p>
43  * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
44  * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
45  * scenario, hence we consciously ignore it to keep the design relatively simple.
46  *
47  * <p>
48  * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
49  *
50  * @author Robert Varga
51  */
52 @NotThreadSafe
53 abstract class TransmitQueue {
54     static final class Halted extends TransmitQueue {
55         Halted(final int targetDepth) {
56             super(targetDepth);
57         }
58
59         @Override
60         int canTransmitCount(final int inflightSize) {
61             return 0;
62         }
63
64         @Override
65         TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
66             throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
67         }
68     }
69
70     static final class Transmitting extends TransmitQueue {
71         private final BackendInfo backend;
72         private long nextTxSequence;
73
74         Transmitting(final int targetDepth, final BackendInfo backend) {
75             super(targetDepth);
76             this.backend = Preconditions.checkNotNull(backend);
77         }
78
79         @Override
80         int canTransmitCount(final int inflightSize) {
81             return backend.getMaxMessages() - inflightSize;
82         }
83
84         @Override
85         TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
86             final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
87                 backend.getSessionId(), nextTxSequence++);
88
89             final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
90                 env.getTxSequence(), now);
91             backend.getActor().tell(env, ActorRef.noSender());
92             return ret;
93         }
94     }
95
96     private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
97
98     private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
99     private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
100     private final ProgressTracker tracker;
101     private ReconnectForwarder successor;
102
103     TransmitQueue(final int targetDepth) {
104         tracker = new AveragingProgressTracker(targetDepth);
105     }
106
107     final Iterable<ConnectionEntry> asIterable() {
108         return Iterables.concat(inflight, pending);
109     }
110
111     final long ticksStalling(final long now) {
112         return tracker.ticksStalling(now);
113     }
114
115     final boolean hasSuccessor() {
116         return successor != null;
117     }
118
119     // If a matching request was found, this will track a task was closed.
120     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
121         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
122         if (maybeEntry == null) {
123             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
124             maybeEntry = findMatchingEntry(pending, envelope);
125         }
126
127         if (maybeEntry == null || !maybeEntry.isPresent()) {
128             LOG.warn("No request matching {} found, ignoring response", envelope);
129             return Optional.empty();
130         }
131
132         final TransmittedConnectionEntry entry = maybeEntry.get();
133         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
134
135         // We have freed up a slot, try to transmit something
136         int toSend = canTransmitCount(inflight.size());
137         while (toSend > 0) {
138             final ConnectionEntry e = pending.poll();
139             if (e == null) {
140                 break;
141             }
142
143             LOG.debug("Transmitting entry {}", e);
144             transmit(e, now);
145             toSend--;
146         }
147
148         return Optional.of(entry);
149     }
150
151     /**
152      * Enqueue an entry, possibly also transmitting it.
153      *
154      * @return Delay to be forced on the calling thread, in nanoseconds.
155      */
156     final long enqueue(final ConnectionEntry entry, final long now) {
157         if (successor != null) {
158             successor.forwardEntry(entry, now);
159             return 0;
160         }
161
162         // XXX: we should place a guard against incorrect entry sequences:
163         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
164
165         // Reserve an entry before we do anything that can fail
166         final long delay = tracker.openTask(now);
167         if (canTransmitCount(inflight.size()) <= 0) {
168             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
169             pending.add(entry);
170         } else {
171             // We are not thread-safe and are supposed to be externally-guarded,
172             // hence send-before-record should be fine.
173             // This needs to be revisited if the external guards are lowered.
174             inflight.offer(transmit(entry, now));
175             LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
176         }
177         return delay;
178     }
179
180     /**
181      * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
182      */
183     abstract int canTransmitCount(int inflightSize);
184
185     abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
186
187     final boolean isEmpty() {
188         return inflight.isEmpty() && pending.isEmpty();
189     }
190
191     final ConnectionEntry peek() {
192         final ConnectionEntry ret = inflight.peek();
193         if (ret != null) {
194             return ret;
195         }
196
197         return pending.peek();
198     }
199
200     final void poison(final RequestException cause) {
201         poisonQueue(inflight, cause);
202         poisonQueue(pending, cause);
203     }
204
205     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
206         Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
207         successor = Preconditions.checkNotNull(forwarder);
208         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
209
210         ConnectionEntry entry = inflight.poll();
211         while (entry != null) {
212             successor.forwardEntry(entry, now);
213             entry = inflight.poll();
214         }
215
216         entry = pending.poll();
217         while (entry != null) {
218             successor.forwardEntry(entry, now);
219             entry = pending.poll();
220         }
221     }
222
223     /*
224      * We are using tri-state return here to indicate one of three conditions:
225      * - if a matching entry is found, return an Optional containing it
226      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
227      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
228      */
229     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
230             justification = "Returning null Optional is documented in the API contract.")
231     private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
232             final ResponseEnvelope<?> envelope) {
233         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
234         // to use an iterator
235         final Iterator<? extends ConnectionEntry> it = queue.iterator();
236         while (it.hasNext()) {
237             final ConnectionEntry e = it.next();
238             final Request<?, ?> request = e.getRequest();
239             final Response<?, ?> response = envelope.getMessage();
240
241             // First check for matching target, or move to next entry
242             if (!request.getTarget().equals(response.getTarget())) {
243                 continue;
244             }
245
246             // Sanity-check logical sequence, ignore any out-of-order messages
247             if (request.getSequence() != response.getSequence()) {
248                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
249                 return Optional.empty();
250             }
251
252             // Check if the entry has (ever) been transmitted
253             if (!(e instanceof TransmittedConnectionEntry)) {
254                 return Optional.empty();
255             }
256
257             final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
258
259             // Now check session match
260             if (envelope.getSessionId() != te.getSessionId()) {
261                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
262                 return Optional.empty();
263             }
264             if (envelope.getTxSequence() != te.getTxSequence()) {
265                 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
266                 return Optional.empty();
267             }
268
269             LOG.debug("Completing request {} with {}", request, envelope);
270             it.remove();
271             return Optional.of(te);
272         }
273
274         return null;
275     }
276
277     private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
278         for (ConnectionEntry e : queue) {
279             final Request<?, ?> request = e.getRequest();
280             LOG.trace("Poisoning request {}", request, cause);
281             e.complete(request.toRequestFailure(cause));
282         }
283         queue.clear();
284     }
285
286 }