705bb0d9cd6e7a32da5f45884bc3d07b6f2592ed
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Collection;
17 import java.util.Deque;
18 import java.util.Iterator;
19 import java.util.Optional;
20 import java.util.Queue;
21 import javax.annotation.concurrent.NotThreadSafe;
22 import org.opendaylight.controller.cluster.access.concepts.Request;
23 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
24 import org.opendaylight.controller.cluster.access.concepts.RequestException;
25 import org.opendaylight.controller.cluster.access.concepts.Response;
26 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
32  * operations.
33  *
34  * <p>
35  * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
36  * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
37  * but can involve spurious removals of non-head entries.
38  *
39  * <p>
40  * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
41  * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
42  * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
43  *
44  * <p>
45  * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
46  * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
47  * scenario, hence we consciously ignore it to keep the design relatively simple.
48  *
49  * <p>
50  * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
51  *
52  * @author Robert Varga
53  */
54 @NotThreadSafe
55 abstract class TransmitQueue {
56     static final class Halted extends TransmitQueue {
57         // For ConnectingClientConnection.
58         Halted(final int targetDepth) {
59             super(targetDepth);
60         }
61
62         // For ReconnectingClientConnection.
63         Halted(final TransmitQueue oldQueue, final long now) {
64             super(oldQueue, now);
65         }
66
67         @Override
68         int canTransmitCount(final int inflightSize) {
69             return 0;
70         }
71
72         @Override
73         TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
74             throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
75         }
76     }
77
78     static final class Transmitting extends TransmitQueue {
79         private final BackendInfo backend;
80         private long nextTxSequence;
81
82         // For ConnectedClientConnection.
83         Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) {
84             super(oldQueue, targetDepth, now);
85             this.backend = Preconditions.checkNotNull(backend);
86         }
87
88         @Override
89         int canTransmitCount(final int inflightSize) {
90             return backend.getMaxMessages() - inflightSize;
91         }
92
93         @Override
94         TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
95             final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
96                 backend.getSessionId(), nextTxSequence++);
97
98             final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
99                 env.getTxSequence(), now);
100             backend.getActor().tell(env, ActorRef.noSender());
101             return ret;
102         }
103     }
104
105     private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
106
107     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
108     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
109     private final AveragingProgressTracker tracker;  // Cannot be just ProgressTracker as we are inheriting limits.
110     private ReconnectForwarder successor;
111
112     /**
113      * Construct initial transmitting queue.
114      */
115     TransmitQueue(final int targetDepth) {
116         tracker = new AveragingProgressTracker(targetDepth);
117     }
118
119     /**
120      * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
121      */
122     TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
123         tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
124     }
125
126     /**
127      * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
128      */
129     TransmitQueue(final TransmitQueue oldQueue, final long now) {
130         tracker = new AveragingProgressTracker(oldQueue.tracker, now);
131     }
132
133     /**
134      * Cancel the accumulated sum of delays as we expect the new backend to work now.
135      */
136     void cancelDebt(final long now) {
137         tracker.cancelDebt(now);
138     }
139
140     /**
141      * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
142      * to be added to it during replay. When we set the successor all entries enqueued between when this methods
143      * returns and the successor is set will be replayed to the successor.
144      *
145      * @return Collection of entries present in the queue.
146      */
147     final Collection<ConnectionEntry> drain() {
148         final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
149         ret.addAll(inflight);
150         ret.addAll(pending);
151         inflight.clear();
152         pending.clear();
153         return ret;
154     }
155
156     final long ticksStalling(final long now) {
157         return tracker.ticksStalling(now);
158     }
159
160     final boolean hasSuccessor() {
161         return successor != null;
162     }
163
164     // If a matching request was found, this will track a task was closed.
165     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
166         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
167         if (maybeEntry == null) {
168             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
169             maybeEntry = findMatchingEntry(pending, envelope);
170         }
171
172         if (maybeEntry == null || !maybeEntry.isPresent()) {
173             LOG.warn("No request matching {} found, ignoring response", envelope);
174             return Optional.empty();
175         }
176
177         final TransmittedConnectionEntry entry = maybeEntry.get();
178         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
179
180         // We have freed up a slot, try to transmit something
181         tryTransmit(now);
182
183         return Optional.of(entry);
184     }
185
186     final void tryTransmit(final long now) {
187         final int toSend = canTransmitCount(inflight.size());
188         if (toSend > 0 && !pending.isEmpty()) {
189             transmitEntries(toSend, now);
190         }
191     }
192
193     private void transmitEntries(final int maxTransmit, final long now) {
194         for (int i = 0; i < maxTransmit; ++i) {
195             final ConnectionEntry e = pending.poll();
196             if (e == null) {
197                 LOG.debug("Queue {} transmitted {} requests", this, i);
198                 return;
199             }
200
201             transmitEntry(e, now);
202         }
203
204         LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
205     }
206
207     private void transmitEntry(final ConnectionEntry entry, final long now) {
208         LOG.debug("Queue {} transmitting entry {}", this, entry);
209         // We are not thread-safe and are supposed to be externally-guarded,
210         // hence send-before-record should be fine.
211         // This needs to be revisited if the external guards are lowered.
212         inflight.addLast(transmit(entry, now));
213     }
214
215     final long enqueueOrForward(final ConnectionEntry entry, final long now) {
216         if (successor != null) {
217             // This call will pay the enqueuing price, hence the caller does not have to
218             successor.forwardEntry(entry, now);
219             return 0;
220         }
221
222         return enqueue(entry, now);
223     }
224
225     final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
226         if (successor != null) {
227             successor.replayEntry(entry, now);
228         } else {
229             enqueue(entry, now);
230         }
231     }
232
233     /**
234      * Enqueue an entry, possibly also transmitting it.
235      *
236      * @return Delay to be forced on the calling thread, in nanoseconds.
237      */
238     private long enqueue(final ConnectionEntry entry, final long now) {
239
240         // XXX: we should place a guard against incorrect entry sequences:
241         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
242
243         // Reserve an entry before we do anything that can fail
244         final long delay = tracker.openTask(now);
245
246         /*
247          * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
248          * to have available send slots and non-empty pending queue.
249          */
250         final int toSend = canTransmitCount(inflight.size());
251         if (toSend <= 0) {
252             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
253             pending.addLast(entry);
254             return delay;
255         }
256
257         if (pending.isEmpty()) {
258             transmitEntry(entry, now);
259             return delay;
260         }
261
262         pending.addLast(entry);
263         transmitEntries(toSend, now);
264         return delay;
265     }
266
267     /**
268      * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
269      */
270     abstract int canTransmitCount(int inflightSize);
271
272     abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
273
274     final boolean isEmpty() {
275         return inflight.isEmpty() && pending.isEmpty();
276     }
277
278     final ConnectionEntry peek() {
279         final ConnectionEntry ret = inflight.peek();
280         if (ret != null) {
281             return ret;
282         }
283
284         return pending.peek();
285     }
286
287     final void poison(final RequestException cause) {
288         poisonQueue(inflight, cause);
289         poisonQueue(pending, cause);
290     }
291
292     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
293         Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this);
294         successor = Preconditions.checkNotNull(forwarder);
295         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
296
297         /*
298          * We need to account for entries which have been added between the time drain() was called and this method
299          * is invoked. Since the old connection is visible during replay and some entries may have completed on the
300          * replay thread, there was an avenue for this to happen.
301          */
302         int count = 0;
303         ConnectionEntry entry = inflight.poll();
304         while (entry != null) {
305             successor.replayEntry(entry, now);
306             entry = inflight.poll();
307             count++;
308         }
309
310         entry = pending.poll();
311         while (entry != null) {
312             successor.replayEntry(entry, now);
313             entry = pending.poll();
314             count++;
315         }
316
317         LOG.debug("Connection {} queue spliced {} messages", this, count);
318     }
319
320     final void remove(final long now) {
321         final TransmittedConnectionEntry txe = inflight.poll();
322         if (txe == null) {
323             final ConnectionEntry entry = pending.pop();
324             tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
325         } else {
326             tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
327         }
328     }
329
330     @VisibleForTesting
331     Deque<TransmittedConnectionEntry> getInflight() {
332         return inflight;
333     }
334
335     @VisibleForTesting
336     Deque<ConnectionEntry> getPending() {
337         return pending;
338     }
339
340     /*
341      * We are using tri-state return here to indicate one of three conditions:
342      * - if a matching entry is found, return an Optional containing it
343      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
344      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
345      */
346     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
347             justification = "Returning null Optional is documented in the API contract.")
348     private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
349             final ResponseEnvelope<?> envelope) {
350         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
351         // to use an iterator
352         final Iterator<? extends ConnectionEntry> it = queue.iterator();
353         while (it.hasNext()) {
354             final ConnectionEntry e = it.next();
355             final Request<?, ?> request = e.getRequest();
356             final Response<?, ?> response = envelope.getMessage();
357
358             // First check for matching target, or move to next entry
359             if (!request.getTarget().equals(response.getTarget())) {
360                 continue;
361             }
362
363             // Sanity-check logical sequence, ignore any out-of-order messages
364             if (request.getSequence() != response.getSequence()) {
365                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
366                 return Optional.empty();
367             }
368
369             // Check if the entry has (ever) been transmitted
370             if (!(e instanceof TransmittedConnectionEntry)) {
371                 return Optional.empty();
372             }
373
374             final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
375
376             // Now check session match
377             if (envelope.getSessionId() != te.getSessionId()) {
378                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
379                 return Optional.empty();
380             }
381             if (envelope.getTxSequence() != te.getTxSequence()) {
382                 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
383                 return Optional.empty();
384             }
385
386             LOG.debug("Completing request {} with {}", request, envelope);
387             it.remove();
388             return Optional.of(te);
389         }
390
391         return null;
392     }
393
394     private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
395         for (ConnectionEntry e : queue) {
396             final Request<?, ?> request = e.getRequest();
397             LOG.trace("Poisoning request {}", request, cause);
398             e.complete(request.toRequestFailure(cause));
399         }
400         queue.clear();
401     }
402 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.