Slice front-end request messages
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Collection;
17 import java.util.Deque;
18 import java.util.Iterator;
19 import java.util.Optional;
20 import java.util.Queue;
21 import javax.annotation.concurrent.NotThreadSafe;
22 import org.opendaylight.controller.cluster.access.concepts.Request;
23 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
24 import org.opendaylight.controller.cluster.access.concepts.RequestException;
25 import org.opendaylight.controller.cluster.access.concepts.Response;
26 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
27 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
28 import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
29 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
30 import org.opendaylight.controller.cluster.messaging.SliceOptions;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
36  * operations.
37  *
38  * <p>
39  * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
40  * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
41  * but can involve spurious removals of non-head entries.
42  *
43  * <p>
44  * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
45  * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
46  * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
47  *
48  * <p>
49  * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
50  * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
51  * scenario, hence we consciously ignore it to keep the design relatively simple.
52  *
53  * <p>
54  * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
55  *
56  * @author Robert Varga
57  */
58 @NotThreadSafe
59 abstract class TransmitQueue {
60     static final class Halted extends TransmitQueue {
61         // For ConnectingClientConnection.
62         Halted(final int targetDepth) {
63             super(targetDepth);
64         }
65
66         // For ReconnectingClientConnection.
67         Halted(final TransmitQueue oldQueue, final long now) {
68             super(oldQueue, now);
69         }
70
71         @Override
72         int canTransmitCount(final int inflightSize) {
73             return 0;
74         }
75
76         @Override
77         Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
78             throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
79         }
80
81         @Override
82         void preComplete(ResponseEnvelope<?> envelope) {
83         }
84     }
85
86     static final class Transmitting extends TransmitQueue {
87         private static final long NOT_SLICING = -1;
88
89         private final BackendInfo backend;
90         private final MessageSlicer messageSlicer;
91         private long nextTxSequence;
92         private long currentSlicedEnvSequenceId = NOT_SLICING;
93
94         // For ConnectedClientConnection.
95         Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
96                 final MessageSlicer messageSlicer) {
97             super(oldQueue, targetDepth, now);
98             this.backend = Preconditions.checkNotNull(backend);
99             this.messageSlicer = Preconditions.checkNotNull(messageSlicer);
100         }
101
102         @Override
103         int canTransmitCount(final int inflightSize) {
104             return backend.getMaxMessages() - inflightSize;
105         }
106
107         @Override
108         Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
109             // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
110             // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
111             // Optional to indicate the request was not transmitted.
112             if (currentSlicedEnvSequenceId >= 0) {
113                 return Optional.empty();
114             }
115
116             final Request<?, ?> request = entry.getRequest();
117             final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
118                 backend.getSessionId(), nextTxSequence++);
119
120             if (request instanceof SliceableMessage) {
121                 if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
122                         .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
123                         .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
124                                 "Failed to slice request " + request, t), 0L)).build())) {
125                     // The request was sliced so record the envelope sequence id to prevent transmitting
126                     // subsequent requests until slicing completes.
127                     currentSlicedEnvSequenceId = env.getTxSequence();
128                 }
129             } else {
130                 backend.getActor().tell(env, ActorRef.noSender());
131             }
132
133             return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
134                     env.getTxSequence(), now));
135         }
136
137         @Override
138         void preComplete(ResponseEnvelope<?> envelope) {
139             if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
140                 // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
141                 // requests to be transmitted.
142                 currentSlicedEnvSequenceId = NOT_SLICING;
143             }
144         }
145     }
146
147     private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
148
149     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
150     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
151     private final AveragingProgressTracker tracker;  // Cannot be just ProgressTracker as we are inheriting limits.
152     private ReconnectForwarder successor;
153
154     /**
155      * Construct initial transmitting queue.
156      */
157     TransmitQueue(final int targetDepth) {
158         tracker = new AveragingProgressTracker(targetDepth);
159     }
160
161     /**
162      * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
163      */
164     TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
165         tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
166     }
167
168     /**
169      * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
170      */
171     TransmitQueue(final TransmitQueue oldQueue, final long now) {
172         tracker = new AveragingProgressTracker(oldQueue.tracker, now);
173     }
174
175     /**
176      * Cancel the accumulated sum of delays as we expect the new backend to work now.
177      */
178     void cancelDebt(final long now) {
179         tracker.cancelDebt(now);
180     }
181
182     /**
183      * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
184      * to be added to it during replay. When we set the successor all entries enqueued between when this methods
185      * returns and the successor is set will be replayed to the successor.
186      *
187      * @return Collection of entries present in the queue.
188      */
189     final Collection<ConnectionEntry> drain() {
190         final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
191         ret.addAll(inflight);
192         ret.addAll(pending);
193         inflight.clear();
194         pending.clear();
195         return ret;
196     }
197
198     final long ticksStalling(final long now) {
199         return tracker.ticksStalling(now);
200     }
201
202     final boolean hasSuccessor() {
203         return successor != null;
204     }
205
206     // If a matching request was found, this will track a task was closed.
207     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
208         preComplete(envelope);
209
210         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
211         if (maybeEntry == null) {
212             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
213             maybeEntry = findMatchingEntry(pending, envelope);
214         }
215
216         if (maybeEntry == null || !maybeEntry.isPresent()) {
217             LOG.warn("No request matching {} found, ignoring response", envelope);
218             return Optional.empty();
219         }
220
221         final TransmittedConnectionEntry entry = maybeEntry.get();
222         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
223
224         // We have freed up a slot, try to transmit something
225         tryTransmit(now);
226
227         return Optional.of(entry);
228     }
229
230     final void tryTransmit(final long now) {
231         final int toSend = canTransmitCount(inflight.size());
232         if (toSend > 0 && !pending.isEmpty()) {
233             transmitEntries(toSend, now);
234         }
235     }
236
237     private void transmitEntries(final int maxTransmit, final long now) {
238         for (int i = 0; i < maxTransmit; ++i) {
239             final ConnectionEntry e = pending.poll();
240             if (e == null || !transmitEntry(e, now)) {
241                 LOG.debug("Queue {} transmitted {} requests", this, i);
242                 return;
243             }
244         }
245
246         LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
247     }
248
249     private boolean transmitEntry(final ConnectionEntry entry, final long now) {
250         LOG.debug("Queue {} transmitting entry {}", this, entry);
251         // We are not thread-safe and are supposed to be externally-guarded,
252         // hence send-before-record should be fine.
253         // This needs to be revisited if the external guards are lowered.
254         final Optional<TransmittedConnectionEntry> maybeTransmitted = transmit(entry, now);
255         if (!maybeTransmitted.isPresent()) {
256             return false;
257         }
258
259         inflight.addLast(maybeTransmitted.get());
260         return true;
261     }
262
263     final long enqueueOrForward(final ConnectionEntry entry, final long now) {
264         if (successor != null) {
265             // This call will pay the enqueuing price, hence the caller does not have to
266             successor.forwardEntry(entry, now);
267             return 0;
268         }
269
270         return enqueue(entry, now);
271     }
272
273     final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
274         if (successor != null) {
275             successor.replayEntry(entry, now);
276         } else {
277             enqueue(entry, now);
278         }
279     }
280
281     /**
282      * Enqueue an entry, possibly also transmitting it.
283      *
284      * @return Delay to be forced on the calling thread, in nanoseconds.
285      */
286     private long enqueue(final ConnectionEntry entry, final long now) {
287
288         // XXX: we should place a guard against incorrect entry sequences:
289         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
290
291         // Reserve an entry before we do anything that can fail
292         final long delay = tracker.openTask(now);
293
294         /*
295          * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
296          * to have available send slots and non-empty pending queue.
297          */
298         final int toSend = canTransmitCount(inflight.size());
299         if (toSend <= 0) {
300             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
301             pending.addLast(entry);
302             return delay;
303         }
304
305         if (pending.isEmpty()) {
306             if (!transmitEntry(entry, now)) {
307                 LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
308                 pending.addLast(entry);
309             }
310
311             return delay;
312         }
313
314         pending.addLast(entry);
315         transmitEntries(toSend, now);
316         return delay;
317     }
318
319     /**
320      * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
321      */
322     abstract int canTransmitCount(int inflightSize);
323
324     abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now);
325
326     abstract void preComplete(ResponseEnvelope<?> envelope);
327
328     final boolean isEmpty() {
329         return inflight.isEmpty() && pending.isEmpty();
330     }
331
332     final ConnectionEntry peek() {
333         final ConnectionEntry ret = inflight.peek();
334         if (ret != null) {
335             return ret;
336         }
337
338         return pending.peek();
339     }
340
341     final void poison(final RequestException cause) {
342         poisonQueue(inflight, cause);
343         poisonQueue(pending, cause);
344     }
345
346     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
347         Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this);
348         successor = Preconditions.checkNotNull(forwarder);
349         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
350
351         /*
352          * We need to account for entries which have been added between the time drain() was called and this method
353          * is invoked. Since the old connection is visible during replay and some entries may have completed on the
354          * replay thread, there was an avenue for this to happen.
355          */
356         int count = 0;
357         ConnectionEntry entry = inflight.poll();
358         while (entry != null) {
359             successor.replayEntry(entry, now);
360             entry = inflight.poll();
361             count++;
362         }
363
364         entry = pending.poll();
365         while (entry != null) {
366             successor.replayEntry(entry, now);
367             entry = pending.poll();
368             count++;
369         }
370
371         LOG.debug("Connection {} queue spliced {} messages", this, count);
372     }
373
374     final void remove(final long now) {
375         final TransmittedConnectionEntry txe = inflight.poll();
376         if (txe == null) {
377             final ConnectionEntry entry = pending.pop();
378             tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
379         } else {
380             tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
381         }
382     }
383
384     @VisibleForTesting
385     Deque<TransmittedConnectionEntry> getInflight() {
386         return inflight;
387     }
388
389     @VisibleForTesting
390     Deque<ConnectionEntry> getPending() {
391         return pending;
392     }
393
394     /*
395      * We are using tri-state return here to indicate one of three conditions:
396      * - if a matching entry is found, return an Optional containing it
397      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
398      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
399      */
400     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
401             justification = "Returning null Optional is documented in the API contract.")
402     private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
403             final ResponseEnvelope<?> envelope) {
404         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
405         // to use an iterator
406         final Iterator<? extends ConnectionEntry> it = queue.iterator();
407         while (it.hasNext()) {
408             final ConnectionEntry e = it.next();
409             final Request<?, ?> request = e.getRequest();
410             final Response<?, ?> response = envelope.getMessage();
411
412             // First check for matching target, or move to next entry
413             if (!request.getTarget().equals(response.getTarget())) {
414                 continue;
415             }
416
417             // Sanity-check logical sequence, ignore any out-of-order messages
418             if (request.getSequence() != response.getSequence()) {
419                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
420                 return Optional.empty();
421             }
422
423             // Check if the entry has (ever) been transmitted
424             if (!(e instanceof TransmittedConnectionEntry)) {
425                 return Optional.empty();
426             }
427
428             final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
429
430             // Now check session match
431             if (envelope.getSessionId() != te.getSessionId()) {
432                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
433                 return Optional.empty();
434             }
435             if (envelope.getTxSequence() != te.getTxSequence()) {
436                 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
437                 return Optional.empty();
438             }
439
440             LOG.debug("Completing request {} with {}", request, envelope);
441             it.remove();
442             return Optional.of(te);
443         }
444
445         return null;
446     }
447
448     private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
449         for (ConnectionEntry e : queue) {
450             final Request<?, ?> request = e.getRequest();
451             LOG.trace("Poisoning request {}", request, cause);
452             e.complete(request.toRequestFailure(cause));
453         }
454         queue.clear();
455     }
456 }