Poison entries outside of main lock
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Deque;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Optional;
22 import java.util.Queue;
23 import javax.annotation.concurrent.NotThreadSafe;
24 import org.opendaylight.controller.cluster.access.concepts.Request;
25 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
26 import org.opendaylight.controller.cluster.access.concepts.Response;
27 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
28 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
29 import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
30 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
31 import org.opendaylight.controller.cluster.messaging.SliceOptions;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
37  * operations.
38  *
39  * <p>
40  * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
41  * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
42  * but can involve spurious removals of non-head entries.
43  *
44  * <p>
45  * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
46  * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
47  * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
48  *
49  * <p>
50  * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
51  * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
52  * scenario, hence we consciously ignore it to keep the design relatively simple.
53  *
54  * <p>
55  * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
56  *
57  * @author Robert Varga
58  */
59 @NotThreadSafe
60 abstract class TransmitQueue {
61     static final class Halted extends TransmitQueue {
62         // For ConnectingClientConnection.
63         Halted(final int targetDepth) {
64             super(targetDepth);
65         }
66
67         // For ReconnectingClientConnection.
68         Halted(final TransmitQueue oldQueue, final long now) {
69             super(oldQueue, now);
70         }
71
72         @Override
73         int canTransmitCount(final int inflightSize) {
74             return 0;
75         }
76
77         @Override
78         Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
79             throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
80         }
81
82         @Override
83         void preComplete(ResponseEnvelope<?> envelope) {
84         }
85     }
86
87     static final class Transmitting extends TransmitQueue {
88         private static final long NOT_SLICING = -1;
89
90         private final BackendInfo backend;
91         private final MessageSlicer messageSlicer;
92         private long nextTxSequence;
93         private long currentSlicedEnvSequenceId = NOT_SLICING;
94
95         // For ConnectedClientConnection.
96         Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
97                 final MessageSlicer messageSlicer) {
98             super(oldQueue, targetDepth, now);
99             this.backend = Preconditions.checkNotNull(backend);
100             this.messageSlicer = Preconditions.checkNotNull(messageSlicer);
101         }
102
103         @Override
104         int canTransmitCount(final int inflightSize) {
105             return backend.getMaxMessages() - inflightSize;
106         }
107
108         @Override
109         Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
110             // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
111             // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
112             // Optional to indicate the request was not transmitted.
113             if (currentSlicedEnvSequenceId >= 0) {
114                 return Optional.empty();
115             }
116
117             final Request<?, ?> request = entry.getRequest();
118             final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
119                 backend.getSessionId(), nextTxSequence++);
120
121             if (request instanceof SliceableMessage) {
122                 if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
123                         .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
124                         .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
125                                 "Failed to slice request " + request, t), 0L)).build())) {
126                     // The request was sliced so record the envelope sequence id to prevent transmitting
127                     // subsequent requests until slicing completes.
128                     currentSlicedEnvSequenceId = env.getTxSequence();
129                 }
130             } else {
131                 backend.getActor().tell(env, ActorRef.noSender());
132             }
133
134             return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
135                     env.getTxSequence(), now));
136         }
137
138         @Override
139         void preComplete(ResponseEnvelope<?> envelope) {
140             if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
141                 // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
142                 // requests to be transmitted.
143                 currentSlicedEnvSequenceId = NOT_SLICING;
144             }
145         }
146     }
147
148     private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
149
150     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
151     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
152     private final AveragingProgressTracker tracker;  // Cannot be just ProgressTracker as we are inheriting limits.
153     private ReconnectForwarder successor;
154
155     /**
156      * Construct initial transmitting queue.
157      */
158     TransmitQueue(final int targetDepth) {
159         tracker = new AveragingProgressTracker(targetDepth);
160     }
161
162     /**
163      * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
164      */
165     TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
166         tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
167     }
168
169     /**
170      * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
171      */
172     TransmitQueue(final TransmitQueue oldQueue, final long now) {
173         tracker = new AveragingProgressTracker(oldQueue.tracker, now);
174     }
175
176     /**
177      * Cancel the accumulated sum of delays as we expect the new backend to work now.
178      */
179     void cancelDebt(final long now) {
180         tracker.cancelDebt(now);
181     }
182
183     /**
184      * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
185      * to be added to it during replay. When we set the successor all entries enqueued between when this methods
186      * returns and the successor is set will be replayed to the successor.
187      *
188      * @return Collection of entries present in the queue.
189      */
190     final Collection<ConnectionEntry> drain() {
191         final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
192         ret.addAll(inflight);
193         ret.addAll(pending);
194         inflight.clear();
195         pending.clear();
196         return ret;
197     }
198
199     final long ticksStalling(final long now) {
200         return tracker.ticksStalling(now);
201     }
202
203     final boolean hasSuccessor() {
204         return successor != null;
205     }
206
207     // If a matching request was found, this will track a task was closed.
208     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
209         preComplete(envelope);
210
211         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
212         if (maybeEntry == null) {
213             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
214             maybeEntry = findMatchingEntry(pending, envelope);
215         }
216
217         if (maybeEntry == null || !maybeEntry.isPresent()) {
218             LOG.warn("No request matching {} found, ignoring response", envelope);
219             return Optional.empty();
220         }
221
222         final TransmittedConnectionEntry entry = maybeEntry.get();
223         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
224
225         // We have freed up a slot, try to transmit something
226         tryTransmit(now);
227
228         return Optional.of(entry);
229     }
230
231     final void tryTransmit(final long now) {
232         final int toSend = canTransmitCount(inflight.size());
233         if (toSend > 0 && !pending.isEmpty()) {
234             transmitEntries(toSend, now);
235         }
236     }
237
238     private void transmitEntries(final int maxTransmit, final long now) {
239         for (int i = 0; i < maxTransmit; ++i) {
240             final ConnectionEntry e = pending.poll();
241             if (e == null || !transmitEntry(e, now)) {
242                 LOG.debug("Queue {} transmitted {} requests", this, i);
243                 return;
244             }
245         }
246
247         LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
248     }
249
250     private boolean transmitEntry(final ConnectionEntry entry, final long now) {
251         LOG.debug("Queue {} transmitting entry {}", this, entry);
252         // We are not thread-safe and are supposed to be externally-guarded,
253         // hence send-before-record should be fine.
254         // This needs to be revisited if the external guards are lowered.
255         final Optional<TransmittedConnectionEntry> maybeTransmitted = transmit(entry, now);
256         if (!maybeTransmitted.isPresent()) {
257             return false;
258         }
259
260         inflight.addLast(maybeTransmitted.get());
261         return true;
262     }
263
264     final long enqueueOrForward(final ConnectionEntry entry, final long now) {
265         if (successor != null) {
266             // This call will pay the enqueuing price, hence the caller does not have to
267             successor.forwardEntry(entry, now);
268             return 0;
269         }
270
271         return enqueue(entry, now);
272     }
273
274     final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
275         if (successor != null) {
276             successor.replayEntry(entry, now);
277         } else {
278             enqueue(entry, now);
279         }
280     }
281
282     /**
283      * Enqueue an entry, possibly also transmitting it.
284      *
285      * @return Delay to be forced on the calling thread, in nanoseconds.
286      */
287     private long enqueue(final ConnectionEntry entry, final long now) {
288
289         // XXX: we should place a guard against incorrect entry sequences:
290         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
291
292         // Reserve an entry before we do anything that can fail
293         final long delay = tracker.openTask(now);
294
295         /*
296          * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
297          * to have available send slots and non-empty pending queue.
298          */
299         final int toSend = canTransmitCount(inflight.size());
300         if (toSend <= 0) {
301             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
302             pending.addLast(entry);
303             return delay;
304         }
305
306         if (pending.isEmpty()) {
307             if (!transmitEntry(entry, now)) {
308                 LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
309                 pending.addLast(entry);
310             }
311
312             return delay;
313         }
314
315         pending.addLast(entry);
316         transmitEntries(toSend, now);
317         return delay;
318     }
319
320     /**
321      * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
322      */
323     abstract int canTransmitCount(int inflightSize);
324
325     abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now);
326
327     abstract void preComplete(ResponseEnvelope<?> envelope);
328
329     final boolean isEmpty() {
330         return inflight.isEmpty() && pending.isEmpty();
331     }
332
333     final ConnectionEntry peek() {
334         final ConnectionEntry ret = inflight.peek();
335         if (ret != null) {
336             return ret;
337         }
338
339         return pending.peek();
340     }
341
342     final List<ConnectionEntry> poison() {
343         final List<ConnectionEntry> entries = new ArrayList<>(inflight.size() + pending.size());
344         entries.addAll(inflight);
345         inflight.clear();
346         entries.addAll(pending);
347         pending.clear();
348         return entries;
349     }
350
351     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
352         Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this);
353         successor = Preconditions.checkNotNull(forwarder);
354         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
355
356         /*
357          * We need to account for entries which have been added between the time drain() was called and this method
358          * is invoked. Since the old connection is visible during replay and some entries may have completed on the
359          * replay thread, there was an avenue for this to happen.
360          */
361         int count = 0;
362         ConnectionEntry entry = inflight.poll();
363         while (entry != null) {
364             successor.replayEntry(entry, now);
365             entry = inflight.poll();
366             count++;
367         }
368
369         entry = pending.poll();
370         while (entry != null) {
371             successor.replayEntry(entry, now);
372             entry = pending.poll();
373             count++;
374         }
375
376         LOG.debug("Connection {} queue spliced {} messages", this, count);
377     }
378
379     final void remove(final long now) {
380         final TransmittedConnectionEntry txe = inflight.poll();
381         if (txe == null) {
382             final ConnectionEntry entry = pending.pop();
383             tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
384         } else {
385             tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
386         }
387     }
388
389     @VisibleForTesting
390     Deque<TransmittedConnectionEntry> getInflight() {
391         return inflight;
392     }
393
394     @VisibleForTesting
395     Deque<ConnectionEntry> getPending() {
396         return pending;
397     }
398
399     /*
400      * We are using tri-state return here to indicate one of three conditions:
401      * - if a matching entry is found, return an Optional containing it
402      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
403      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
404      */
405     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
406             justification = "Returning null Optional is documented in the API contract.")
407     private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
408             final ResponseEnvelope<?> envelope) {
409         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
410         // to use an iterator
411         final Iterator<? extends ConnectionEntry> it = queue.iterator();
412         while (it.hasNext()) {
413             final ConnectionEntry e = it.next();
414             final Request<?, ?> request = e.getRequest();
415             final Response<?, ?> response = envelope.getMessage();
416
417             // First check for matching target, or move to next entry
418             if (!request.getTarget().equals(response.getTarget())) {
419                 continue;
420             }
421
422             // Sanity-check logical sequence, ignore any out-of-order messages
423             if (request.getSequence() != response.getSequence()) {
424                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
425                 return Optional.empty();
426             }
427
428             // Check if the entry has (ever) been transmitted
429             if (!(e instanceof TransmittedConnectionEntry)) {
430                 return Optional.empty();
431             }
432
433             final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
434
435             // Now check session match
436             if (envelope.getSessionId() != te.getSessionId()) {
437                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
438                 return Optional.empty();
439             }
440             if (envelope.getTxSequence() != te.getTxSequence()) {
441                 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
442                 return Optional.empty();
443             }
444
445             LOG.debug("Completing request {} with {}", request, envelope);
446             it.remove();
447             return Optional.of(te);
448         }
449
450         return null;
451     }
452 }