b7543410cd1a63128ac2da7ffde001cc9b3d778f
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Collection;
17 import java.util.Deque;
18 import java.util.Iterator;
19 import java.util.Optional;
20 import java.util.Queue;
21 import javax.annotation.concurrent.NotThreadSafe;
22 import org.opendaylight.controller.cluster.access.concepts.Request;
23 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
24 import org.opendaylight.controller.cluster.access.concepts.RequestException;
25 import org.opendaylight.controller.cluster.access.concepts.Response;
26 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
32  * operations.
33  *
34  * <p>
35  * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
36  * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
37  * but can involve spurious removals of non-head entries.
38  *
39  * <p>
40  * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
41  * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
42  * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
43  *
44  * <p>
45  * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
46  * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
47  * scenario, hence we consciously ignore it to keep the design relatively simple.
48  *
49  * <p>
50  * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
51  *
52  * @author Robert Varga
53  */
54 @NotThreadSafe
55 abstract class TransmitQueue {
56     static final class Halted extends TransmitQueue {
57         Halted(final int targetDepth) {
58             super(targetDepth);
59         }
60
61         @Override
62         int canTransmitCount(final int inflightSize) {
63             return 0;
64         }
65
66         @Override
67         TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
68             throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
69         }
70     }
71
72     static final class Transmitting extends TransmitQueue {
73         private final BackendInfo backend;
74         private long nextTxSequence;
75
76         Transmitting(final int targetDepth, final BackendInfo backend) {
77             super(targetDepth);
78             this.backend = Preconditions.checkNotNull(backend);
79         }
80
81         @Override
82         int canTransmitCount(final int inflightSize) {
83             return backend.getMaxMessages() - inflightSize;
84         }
85
86         @Override
87         TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
88             final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
89                 backend.getSessionId(), nextTxSequence++);
90
91             final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
92                 env.getTxSequence(), now);
93             backend.getActor().tell(env, ActorRef.noSender());
94             return ret;
95         }
96     }
97
98     private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
99
100     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
101     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
102     private final ProgressTracker tracker;
103     private ReconnectForwarder successor;
104
105     TransmitQueue(final int targetDepth) {
106         tracker = new AveragingProgressTracker(targetDepth);
107     }
108
109     /**
110      * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
111      * to be added to it during replay. When we set the successor all entries enqueued between when this methods
112      * returns and the successor is set will be replayed to the successor.
113      *
114      * @return Collection of entries present in the queue.
115      */
116     final Collection<ConnectionEntry> drain() {
117         final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
118         ret.addAll(inflight);
119         ret.addAll(pending);
120         inflight.clear();
121         pending.clear();
122         return ret;
123     }
124
125     final long ticksStalling(final long now) {
126         return tracker.ticksStalling(now);
127     }
128
129     final boolean hasSuccessor() {
130         return successor != null;
131     }
132
133     // If a matching request was found, this will track a task was closed.
134     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
135         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
136         if (maybeEntry == null) {
137             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
138             maybeEntry = findMatchingEntry(pending, envelope);
139         }
140
141         if (maybeEntry == null || !maybeEntry.isPresent()) {
142             LOG.warn("No request matching {} found, ignoring response", envelope);
143             return Optional.empty();
144         }
145
146         final TransmittedConnectionEntry entry = maybeEntry.get();
147         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
148
149         // We have freed up a slot, try to transmit something
150         tryTransmit(now);
151
152         return Optional.of(entry);
153     }
154
155     final void tryTransmit(final long now) {
156         final int toSend = canTransmitCount(inflight.size());
157         if (toSend > 0 && !pending.isEmpty()) {
158             transmitEntries(toSend, now);
159         }
160     }
161
162     private void transmitEntries(final int maxTransmit, final long now) {
163         for (int i = 0; i < maxTransmit; ++i) {
164             final ConnectionEntry e = pending.poll();
165             if (e == null) {
166                 LOG.debug("Queue {} transmitted {} requests", this, i);
167                 return;
168             }
169
170             transmitEntry(e, now);
171         }
172
173         LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
174     }
175
176     private void transmitEntry(final ConnectionEntry entry, final long now) {
177         LOG.debug("Queue {} transmitting entry {}", entry);
178         // We are not thread-safe and are supposed to be externally-guarded,
179         // hence send-before-record should be fine.
180         // This needs to be revisited if the external guards are lowered.
181         inflight.addLast(transmit(entry, now));
182     }
183
184     /**
185      * Enqueue an entry, possibly also transmitting it.
186      *
187      * @return Delay to be forced on the calling thread, in nanoseconds.
188      */
189     final long enqueue(final ConnectionEntry entry, final long now) {
190         if (successor != null) {
191             successor.forwardEntry(entry, now);
192             return 0;
193         }
194
195         // XXX: we should place a guard against incorrect entry sequences:
196         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
197
198         // Reserve an entry before we do anything that can fail
199         final long delay = tracker.openTask(now);
200
201         /*
202          * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
203          * to have available send slots and non-empty pending queue.
204          */
205         final int toSend = canTransmitCount(inflight.size());
206         if (toSend <= 0) {
207             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
208             pending.addLast(entry);
209             return delay;
210         }
211
212         if (pending.isEmpty()) {
213             transmitEntry(entry, now);
214             return delay;
215         }
216
217         pending.addLast(entry);
218         transmitEntries(toSend, now);
219         return delay;
220     }
221
222     /**
223      * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
224      */
225     abstract int canTransmitCount(int inflightSize);
226
227     abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
228
229     final boolean isEmpty() {
230         return inflight.isEmpty() && pending.isEmpty();
231     }
232
233     final ConnectionEntry peek() {
234         final ConnectionEntry ret = inflight.peek();
235         if (ret != null) {
236             return ret;
237         }
238
239         return pending.peek();
240     }
241
242     final void poison(final RequestException cause) {
243         poisonQueue(inflight, cause);
244         poisonQueue(pending, cause);
245     }
246
247     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
248         Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
249         successor = Preconditions.checkNotNull(forwarder);
250         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
251
252         /*
253          * We need to account for entries which have been added between the time drain() was called and this method
254          * is invoked. Since the old connection is visible during replay and some entries may have completed on the
255          * replay thread, there was an avenue for this to happen.
256          */
257         int count = 0;
258         ConnectionEntry entry = inflight.poll();
259         while (entry != null) {
260             successor.forwardEntry(entry, now);
261             entry = inflight.poll();
262             count++;
263         }
264
265         entry = pending.poll();
266         while (entry != null) {
267             successor.forwardEntry(entry, now);
268             entry = pending.poll();
269             count++;
270         }
271
272         LOG.debug("Connection {} queue spliced {} messages", this, count);
273     }
274
275     final void remove(final long now) {
276         final TransmittedConnectionEntry txe = inflight.poll();
277         if (txe == null) {
278             final ConnectionEntry entry = pending.pop();
279             tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
280         } else {
281             tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
282         }
283     }
284
285     @VisibleForTesting
286     Deque<TransmittedConnectionEntry> getInflight() {
287         return inflight;
288     }
289
290     @VisibleForTesting
291     Deque<ConnectionEntry> getPending() {
292         return pending;
293     }
294
295     /*
296      * We are using tri-state return here to indicate one of three conditions:
297      * - if a matching entry is found, return an Optional containing it
298      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
299      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
300      */
301     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
302             justification = "Returning null Optional is documented in the API contract.")
303     private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
304             final ResponseEnvelope<?> envelope) {
305         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
306         // to use an iterator
307         final Iterator<? extends ConnectionEntry> it = queue.iterator();
308         while (it.hasNext()) {
309             final ConnectionEntry e = it.next();
310             final Request<?, ?> request = e.getRequest();
311             final Response<?, ?> response = envelope.getMessage();
312
313             // First check for matching target, or move to next entry
314             if (!request.getTarget().equals(response.getTarget())) {
315                 continue;
316             }
317
318             // Sanity-check logical sequence, ignore any out-of-order messages
319             if (request.getSequence() != response.getSequence()) {
320                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
321                 return Optional.empty();
322             }
323
324             // Check if the entry has (ever) been transmitted
325             if (!(e instanceof TransmittedConnectionEntry)) {
326                 return Optional.empty();
327             }
328
329             final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
330
331             // Now check session match
332             if (envelope.getSessionId() != te.getSessionId()) {
333                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
334                 return Optional.empty();
335             }
336             if (envelope.getTxSequence() != te.getTxSequence()) {
337                 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
338                 return Optional.empty();
339             }
340
341             LOG.debug("Completing request {} with {}", request, envelope);
342             it.remove();
343             return Optional.of(te);
344         }
345
346         return null;
347     }
348
349     private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
350         for (ConnectionEntry e : queue) {
351             final Request<?, ?> request = e.getRequest();
352             LOG.trace("Poisoning request {}", request, cause);
353             e.complete(request.toRequestFailure(cause));
354         }
355         queue.clear();
356     }
357 }