e4ee78c5391754c7020b1a385311c077f74c9479
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Deque;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Optional;
22 import java.util.Queue;
23 import org.opendaylight.controller.cluster.access.concepts.Request;
24 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
25 import org.opendaylight.controller.cluster.access.concepts.Response;
26 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
27 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
28 import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
29 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
30 import org.opendaylight.controller.cluster.messaging.SliceOptions;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
36  * operations.
37  *
38  * <p>
39  * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
40  * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
41  * but can involve spurious removals of non-head entries.
42  *
43  * <p>
44  * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
45  * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
46  * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
47  *
48  * <p>
49  * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
50  * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
51  * scenario, hence we consciously ignore it to keep the design relatively simple.
52  *
53  * <p>
54  * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
55  *
56  * @author Robert Varga
57  */
58 abstract class TransmitQueue {
59     static final class Halted extends TransmitQueue {
60         // For ConnectingClientConnection.
61         Halted(final int targetDepth) {
62             super(targetDepth);
63         }
64
65         // For ReconnectingClientConnection.
66         Halted(final TransmitQueue oldQueue, final long now) {
67             super(oldQueue, now);
68         }
69
70         @Override
71         int canTransmitCount(final int inflightSize) {
72             return 0;
73         }
74
75         @Override
76         Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
77             throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
78         }
79
80         @Override
81         void preComplete(final ResponseEnvelope<?> envelope) {
82         }
83     }
84
85     static final class Transmitting extends TransmitQueue {
86         private static final long NOT_SLICING = -1;
87
88         private final BackendInfo backend;
89         private final MessageSlicer messageSlicer;
90         private long nextTxSequence;
91         private long currentSlicedEnvSequenceId = NOT_SLICING;
92
93         // For ConnectedClientConnection.
94         Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
95                 final MessageSlicer messageSlicer) {
96             super(oldQueue, targetDepth, now);
97             this.backend = Preconditions.checkNotNull(backend);
98             this.messageSlicer = Preconditions.checkNotNull(messageSlicer);
99         }
100
101         @Override
102         int canTransmitCount(final int inflightSize) {
103             return backend.getMaxMessages() - inflightSize;
104         }
105
106         @Override
107         Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
108             // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
109             // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
110             // Optional to indicate the request was not transmitted.
111             if (currentSlicedEnvSequenceId >= 0) {
112                 return Optional.empty();
113             }
114
115             final Request<?, ?> request = entry.getRequest();
116             final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
117                 backend.getSessionId(), nextTxSequence++);
118
119             if (request instanceof SliceableMessage) {
120                 if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
121                         .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
122                         .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
123                                 "Failed to slice request " + request, t), 0L)).build())) {
124                     // The request was sliced so record the envelope sequence id to prevent transmitting
125                     // subsequent requests until slicing completes.
126                     currentSlicedEnvSequenceId = env.getTxSequence();
127                 }
128             } else {
129                 backend.getActor().tell(env, ActorRef.noSender());
130             }
131
132             return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
133                     env.getTxSequence(), now));
134         }
135
136         @Override
137         void preComplete(final ResponseEnvelope<?> envelope) {
138             if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
139                 // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
140                 // requests to be transmitted.
141                 currentSlicedEnvSequenceId = NOT_SLICING;
142             }
143         }
144     }
145
146     private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
147
148     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
149     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
150     private final AveragingProgressTracker tracker;  // Cannot be just ProgressTracker as we are inheriting limits.
151     private ReconnectForwarder successor;
152
153     /**
154      * Construct initial transmitting queue.
155      */
156     TransmitQueue(final int targetDepth) {
157         tracker = new AveragingProgressTracker(targetDepth);
158     }
159
160     /**
161      * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
162      */
163     TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
164         tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
165     }
166
167     /**
168      * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
169      */
170     TransmitQueue(final TransmitQueue oldQueue, final long now) {
171         tracker = new AveragingProgressTracker(oldQueue.tracker, now);
172     }
173
174     /**
175      * Cancel the accumulated sum of delays as we expect the new backend to work now.
176      */
177     void cancelDebt(final long now) {
178         tracker.cancelDebt(now);
179     }
180
181     /**
182      * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
183      * to be added to it during replay. When we set the successor all entries enqueued between when this methods
184      * returns and the successor is set will be replayed to the successor.
185      *
186      * @return Collection of entries present in the queue.
187      */
188     final Collection<ConnectionEntry> drain() {
189         final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
190         ret.addAll(inflight);
191         ret.addAll(pending);
192         inflight.clear();
193         pending.clear();
194         return ret;
195     }
196
197     final long ticksStalling(final long now) {
198         return tracker.ticksStalling(now);
199     }
200
201     final boolean hasSuccessor() {
202         return successor != null;
203     }
204
205     // If a matching request was found, this will track a task was closed.
206     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
207         preComplete(envelope);
208
209         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
210         if (maybeEntry == null) {
211             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
212             maybeEntry = findMatchingEntry(pending, envelope);
213         }
214
215         if (maybeEntry == null || !maybeEntry.isPresent()) {
216             LOG.warn("No request matching {} found, ignoring response", envelope);
217             return Optional.empty();
218         }
219
220         final TransmittedConnectionEntry entry = maybeEntry.get();
221         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
222
223         // We have freed up a slot, try to transmit something
224         tryTransmit(now);
225
226         return Optional.of(entry);
227     }
228
229     final void tryTransmit(final long now) {
230         final int toSend = canTransmitCount(inflight.size());
231         if (toSend > 0 && !pending.isEmpty()) {
232             transmitEntries(toSend, now);
233         }
234     }
235
236     private void transmitEntries(final int maxTransmit, final long now) {
237         for (int i = 0; i < maxTransmit; ++i) {
238             final ConnectionEntry e = pending.poll();
239             if (e == null || !transmitEntry(e, now)) {
240                 LOG.debug("Queue {} transmitted {} requests", this, i);
241                 return;
242             }
243         }
244
245         LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
246     }
247
248     private boolean transmitEntry(final ConnectionEntry entry, final long now) {
249         LOG.debug("Queue {} transmitting entry {}", this, entry);
250         // We are not thread-safe and are supposed to be externally-guarded,
251         // hence send-before-record should be fine.
252         // This needs to be revisited if the external guards are lowered.
253         final Optional<TransmittedConnectionEntry> maybeTransmitted = transmit(entry, now);
254         if (!maybeTransmitted.isPresent()) {
255             return false;
256         }
257
258         inflight.addLast(maybeTransmitted.get());
259         return true;
260     }
261
262     final long enqueueOrForward(final ConnectionEntry entry, final long now) {
263         if (successor != null) {
264             // This call will pay the enqueuing price, hence the caller does not have to
265             successor.forwardEntry(entry, now);
266             return 0;
267         }
268
269         return enqueue(entry, now);
270     }
271
272     final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
273         if (successor != null) {
274             successor.replayEntry(entry, now);
275         } else {
276             enqueue(entry, now);
277         }
278     }
279
280     /**
281      * Enqueue an entry, possibly also transmitting it.
282      *
283      * @return Delay to be forced on the calling thread, in nanoseconds.
284      */
285     private long enqueue(final ConnectionEntry entry, final long now) {
286
287         // XXX: we should place a guard against incorrect entry sequences:
288         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
289
290         // Reserve an entry before we do anything that can fail
291         final long delay = tracker.openTask(now);
292
293         /*
294          * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
295          * to have available send slots and non-empty pending queue.
296          */
297         final int toSend = canTransmitCount(inflight.size());
298         if (toSend <= 0) {
299             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
300             pending.addLast(entry);
301             return delay;
302         }
303
304         if (pending.isEmpty()) {
305             if (!transmitEntry(entry, now)) {
306                 LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
307                 pending.addLast(entry);
308             }
309
310             return delay;
311         }
312
313         pending.addLast(entry);
314         transmitEntries(toSend, now);
315         return delay;
316     }
317
318     /**
319      * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
320      */
321     abstract int canTransmitCount(int inflightSize);
322
323     abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now);
324
325     abstract void preComplete(ResponseEnvelope<?> envelope);
326
327     final boolean isEmpty() {
328         return inflight.isEmpty() && pending.isEmpty();
329     }
330
331     final ConnectionEntry peek() {
332         final ConnectionEntry ret = inflight.peek();
333         if (ret != null) {
334             return ret;
335         }
336
337         return pending.peek();
338     }
339
340     final List<ConnectionEntry> poison() {
341         final List<ConnectionEntry> entries = new ArrayList<>(inflight.size() + pending.size());
342         entries.addAll(inflight);
343         inflight.clear();
344         entries.addAll(pending);
345         pending.clear();
346         return entries;
347     }
348
349     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
350         Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this);
351         successor = Preconditions.checkNotNull(forwarder);
352         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
353
354         /*
355          * We need to account for entries which have been added between the time drain() was called and this method
356          * is invoked. Since the old connection is visible during replay and some entries may have completed on the
357          * replay thread, there was an avenue for this to happen.
358          */
359         int count = 0;
360         ConnectionEntry entry = inflight.poll();
361         while (entry != null) {
362             successor.replayEntry(entry, now);
363             entry = inflight.poll();
364             count++;
365         }
366
367         entry = pending.poll();
368         while (entry != null) {
369             successor.replayEntry(entry, now);
370             entry = pending.poll();
371             count++;
372         }
373
374         LOG.debug("Connection {} queue spliced {} messages", this, count);
375     }
376
377     final void remove(final long now) {
378         final TransmittedConnectionEntry txe = inflight.poll();
379         if (txe == null) {
380             final ConnectionEntry entry = pending.pop();
381             tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
382         } else {
383             tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
384         }
385     }
386
387     @VisibleForTesting
388     Deque<TransmittedConnectionEntry> getInflight() {
389         return inflight;
390     }
391
392     @VisibleForTesting
393     Deque<ConnectionEntry> getPending() {
394         return pending;
395     }
396
397     /*
398      * We are using tri-state return here to indicate one of three conditions:
399      * - if a matching entry is found, return an Optional containing it
400      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
401      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
402      */
403     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
404             justification = "Returning null Optional is documented in the API contract.")
405     private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
406             final ResponseEnvelope<?> envelope) {
407         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
408         // to use an iterator
409         final Iterator<? extends ConnectionEntry> it = queue.iterator();
410         while (it.hasNext()) {
411             final ConnectionEntry e = it.next();
412             final Request<?, ?> request = e.getRequest();
413             final Response<?, ?> response = envelope.getMessage();
414
415             // First check for matching target, or move to next entry
416             if (!request.getTarget().equals(response.getTarget())) {
417                 continue;
418             }
419
420             // Sanity-check logical sequence, ignore any out-of-order messages
421             if (request.getSequence() != response.getSequence()) {
422                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
423                 return Optional.empty();
424             }
425
426             // Check if the entry has (ever) been transmitted
427             if (!(e instanceof TransmittedConnectionEntry)) {
428                 return Optional.empty();
429             }
430
431             final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
432
433             // Now check session match
434             if (envelope.getSessionId() != te.getSessionId()) {
435                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
436                 return Optional.empty();
437             }
438             if (envelope.getTxSequence() != te.getTxSequence()) {
439                 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
440                 return Optional.empty();
441             }
442
443             LOG.debug("Completing request {} with {}", request, envelope);
444             it.remove();
445             return Optional.of(te);
446         }
447
448         return null;
449     }
450 }