Bump odlparent to 5.0.0
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Collection;
17 import java.util.Deque;
18 import java.util.Iterator;
19 import java.util.Optional;
20 import java.util.Queue;
21 import org.opendaylight.controller.cluster.access.concepts.Request;
22 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
23 import org.opendaylight.controller.cluster.access.concepts.RequestException;
24 import org.opendaylight.controller.cluster.access.concepts.Response;
25 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
26 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
27 import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
28 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
29 import org.opendaylight.controller.cluster.messaging.SliceOptions;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
35  * operations.
36  *
37  * <p>
38  * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
39  * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
40  * but can involve spurious removals of non-head entries.
41  *
42  * <p>
43  * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
44  * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
45  * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
46  *
47  * <p>
48  * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
49  * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
50  * scenario, hence we consciously ignore it to keep the design relatively simple.
51  *
52  * <p>
53  * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
54  *
55  * @author Robert Varga
56  */
57 abstract class TransmitQueue {
58     static final class Halted extends TransmitQueue {
59         // For ConnectingClientConnection.
60         Halted(final int targetDepth) {
61             super(targetDepth);
62         }
63
64         // For ReconnectingClientConnection.
65         Halted(final TransmitQueue oldQueue, final long now) {
66             super(oldQueue, now);
67         }
68
69         @Override
70         int canTransmitCount(final int inflightSize) {
71             return 0;
72         }
73
74         @Override
75         Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
76             throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
77         }
78
79         @Override
80         void preComplete(final ResponseEnvelope<?> envelope) {
81         }
82     }
83
84     static final class Transmitting extends TransmitQueue {
85         private static final long NOT_SLICING = -1;
86
87         private final BackendInfo backend;
88         private final MessageSlicer messageSlicer;
89         private long nextTxSequence;
90         private long currentSlicedEnvSequenceId = NOT_SLICING;
91
92         // For ConnectedClientConnection.
93         Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
94                 final MessageSlicer messageSlicer) {
95             super(oldQueue, targetDepth, now);
96             this.backend = Preconditions.checkNotNull(backend);
97             this.messageSlicer = Preconditions.checkNotNull(messageSlicer);
98         }
99
100         @Override
101         int canTransmitCount(final int inflightSize) {
102             return backend.getMaxMessages() - inflightSize;
103         }
104
105         @Override
106         Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
107             // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
108             // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
109             // Optional to indicate the request was not transmitted.
110             if (currentSlicedEnvSequenceId >= 0) {
111                 return Optional.empty();
112             }
113
114             final Request<?, ?> request = entry.getRequest();
115             final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
116                 backend.getSessionId(), nextTxSequence++);
117
118             if (request instanceof SliceableMessage) {
119                 if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
120                         .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
121                         .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
122                                 "Failed to slice request " + request, t), 0L)).build())) {
123                     // The request was sliced so record the envelope sequence id to prevent transmitting
124                     // subsequent requests until slicing completes.
125                     currentSlicedEnvSequenceId = env.getTxSequence();
126                 }
127             } else {
128                 backend.getActor().tell(env, ActorRef.noSender());
129             }
130
131             return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
132                     env.getTxSequence(), now));
133         }
134
135         @Override
136         void preComplete(final ResponseEnvelope<?> envelope) {
137             if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
138                 // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
139                 // requests to be transmitted.
140                 currentSlicedEnvSequenceId = NOT_SLICING;
141             }
142         }
143     }
144
145     private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
146
147     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
148     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
149     private final AveragingProgressTracker tracker;  // Cannot be just ProgressTracker as we are inheriting limits.
150     private ReconnectForwarder successor;
151
152     /**
153      * Construct initial transmitting queue.
154      */
155     TransmitQueue(final int targetDepth) {
156         tracker = new AveragingProgressTracker(targetDepth);
157     }
158
159     /**
160      * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
161      */
162     TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
163         tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
164     }
165
166     /**
167      * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
168      */
169     TransmitQueue(final TransmitQueue oldQueue, final long now) {
170         tracker = new AveragingProgressTracker(oldQueue.tracker, now);
171     }
172
173     /**
174      * Cancel the accumulated sum of delays as we expect the new backend to work now.
175      */
176     void cancelDebt(final long now) {
177         tracker.cancelDebt(now);
178     }
179
180     /**
181      * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
182      * to be added to it during replay. When we set the successor all entries enqueued between when this methods
183      * returns and the successor is set will be replayed to the successor.
184      *
185      * @return Collection of entries present in the queue.
186      */
187     final Collection<ConnectionEntry> drain() {
188         final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
189         ret.addAll(inflight);
190         ret.addAll(pending);
191         inflight.clear();
192         pending.clear();
193         return ret;
194     }
195
196     final long ticksStalling(final long now) {
197         return tracker.ticksStalling(now);
198     }
199
200     final boolean hasSuccessor() {
201         return successor != null;
202     }
203
204     // If a matching request was found, this will track a task was closed.
205     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
206         preComplete(envelope);
207
208         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
209         if (maybeEntry == null) {
210             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
211             maybeEntry = findMatchingEntry(pending, envelope);
212         }
213
214         if (maybeEntry == null || !maybeEntry.isPresent()) {
215             LOG.warn("No request matching {} found, ignoring response", envelope);
216             return Optional.empty();
217         }
218
219         final TransmittedConnectionEntry entry = maybeEntry.get();
220         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
221
222         // We have freed up a slot, try to transmit something
223         tryTransmit(now);
224
225         return Optional.of(entry);
226     }
227
228     final void tryTransmit(final long now) {
229         final int toSend = canTransmitCount(inflight.size());
230         if (toSend > 0 && !pending.isEmpty()) {
231             transmitEntries(toSend, now);
232         }
233     }
234
235     private void transmitEntries(final int maxTransmit, final long now) {
236         for (int i = 0; i < maxTransmit; ++i) {
237             final ConnectionEntry e = pending.poll();
238             if (e == null || !transmitEntry(e, now)) {
239                 LOG.debug("Queue {} transmitted {} requests", this, i);
240                 return;
241             }
242         }
243
244         LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
245     }
246
247     private boolean transmitEntry(final ConnectionEntry entry, final long now) {
248         LOG.debug("Queue {} transmitting entry {}", this, entry);
249         // We are not thread-safe and are supposed to be externally-guarded,
250         // hence send-before-record should be fine.
251         // This needs to be revisited if the external guards are lowered.
252         final Optional<TransmittedConnectionEntry> maybeTransmitted = transmit(entry, now);
253         if (!maybeTransmitted.isPresent()) {
254             return false;
255         }
256
257         inflight.addLast(maybeTransmitted.get());
258         return true;
259     }
260
261     final long enqueueOrForward(final ConnectionEntry entry, final long now) {
262         if (successor != null) {
263             // This call will pay the enqueuing price, hence the caller does not have to
264             successor.forwardEntry(entry, now);
265             return 0;
266         }
267
268         return enqueue(entry, now);
269     }
270
271     final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
272         if (successor != null) {
273             successor.replayEntry(entry, now);
274         } else {
275             enqueue(entry, now);
276         }
277     }
278
279     /**
280      * Enqueue an entry, possibly also transmitting it.
281      *
282      * @return Delay to be forced on the calling thread, in nanoseconds.
283      */
284     private long enqueue(final ConnectionEntry entry, final long now) {
285
286         // XXX: we should place a guard against incorrect entry sequences:
287         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
288
289         // Reserve an entry before we do anything that can fail
290         final long delay = tracker.openTask(now);
291
292         /*
293          * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
294          * to have available send slots and non-empty pending queue.
295          */
296         final int toSend = canTransmitCount(inflight.size());
297         if (toSend <= 0) {
298             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
299             pending.addLast(entry);
300             return delay;
301         }
302
303         if (pending.isEmpty()) {
304             if (!transmitEntry(entry, now)) {
305                 LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
306                 pending.addLast(entry);
307             }
308
309             return delay;
310         }
311
312         pending.addLast(entry);
313         transmitEntries(toSend, now);
314         return delay;
315     }
316
317     /**
318      * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
319      */
320     abstract int canTransmitCount(int inflightSize);
321
322     abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now);
323
324     abstract void preComplete(ResponseEnvelope<?> envelope);
325
326     final boolean isEmpty() {
327         return inflight.isEmpty() && pending.isEmpty();
328     }
329
330     final ConnectionEntry peek() {
331         final ConnectionEntry ret = inflight.peek();
332         if (ret != null) {
333             return ret;
334         }
335
336         return pending.peek();
337     }
338
339     final void poison(final RequestException cause) {
340         poisonQueue(inflight, cause);
341         poisonQueue(pending, cause);
342     }
343
344     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
345         Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this);
346         successor = Preconditions.checkNotNull(forwarder);
347         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
348
349         /*
350          * We need to account for entries which have been added between the time drain() was called and this method
351          * is invoked. Since the old connection is visible during replay and some entries may have completed on the
352          * replay thread, there was an avenue for this to happen.
353          */
354         int count = 0;
355         ConnectionEntry entry = inflight.poll();
356         while (entry != null) {
357             successor.replayEntry(entry, now);
358             entry = inflight.poll();
359             count++;
360         }
361
362         entry = pending.poll();
363         while (entry != null) {
364             successor.replayEntry(entry, now);
365             entry = pending.poll();
366             count++;
367         }
368
369         LOG.debug("Connection {} queue spliced {} messages", this, count);
370     }
371
372     final void remove(final long now) {
373         final TransmittedConnectionEntry txe = inflight.poll();
374         if (txe == null) {
375             final ConnectionEntry entry = pending.pop();
376             tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
377         } else {
378             tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
379         }
380     }
381
382     @VisibleForTesting
383     Deque<TransmittedConnectionEntry> getInflight() {
384         return inflight;
385     }
386
387     @VisibleForTesting
388     Deque<ConnectionEntry> getPending() {
389         return pending;
390     }
391
392     /*
393      * We are using tri-state return here to indicate one of three conditions:
394      * - if a matching entry is found, return an Optional containing it
395      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
396      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
397      */
398     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
399             justification = "Returning null Optional is documented in the API contract.")
400     private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
401             final ResponseEnvelope<?> envelope) {
402         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
403         // to use an iterator
404         final Iterator<? extends ConnectionEntry> it = queue.iterator();
405         while (it.hasNext()) {
406             final ConnectionEntry e = it.next();
407             final Request<?, ?> request = e.getRequest();
408             final Response<?, ?> response = envelope.getMessage();
409
410             // First check for matching target, or move to next entry
411             if (!request.getTarget().equals(response.getTarget())) {
412                 continue;
413             }
414
415             // Sanity-check logical sequence, ignore any out-of-order messages
416             if (request.getSequence() != response.getSequence()) {
417                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
418                 return Optional.empty();
419             }
420
421             // Check if the entry has (ever) been transmitted
422             if (!(e instanceof TransmittedConnectionEntry)) {
423                 return Optional.empty();
424             }
425
426             final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
427
428             // Now check session match
429             if (envelope.getSessionId() != te.getSessionId()) {
430                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
431                 return Optional.empty();
432             }
433             if (envelope.getTxSequence() != te.getTxSequence()) {
434                 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
435                 return Optional.empty();
436             }
437
438             LOG.debug("Completing request {} with {}", request, envelope);
439             it.remove();
440             return Optional.of(te);
441         }
442
443         return null;
444     }
445
446     private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
447         for (ConnectionEntry e : queue) {
448             final Request<?, ?> request = e.getRequest();
449             LOG.trace("Poisoning request {}", request, cause);
450             e.complete(request.toRequestFailure(cause));
451         }
452         queue.clear();
453     }
454 }