BUG-5280: implement message queueing
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / actors / client / SequencedQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.actors.client;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Ticker;
13 import java.util.Deque;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Optional;
17 import java.util.concurrent.CompletionStage;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.Nullable;
20 import javax.annotation.concurrent.NotThreadSafe;
21 import org.opendaylight.controller.cluster.access.concepts.Request;
22 import org.opendaylight.controller.cluster.access.concepts.RequestException;
23 import org.opendaylight.controller.cluster.access.concepts.Response;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.duration.FiniteDuration;
27
28 /*
29  * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
30  *       retries and enqueues work as expected.
31  */
32 @NotThreadSafe
33 final class SequencedQueue {
34     private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
35
36     // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
37     @VisibleForTesting
38     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
39     @VisibleForTesting
40     static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
41     private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
42         TimeUnit.NANOSECONDS);
43
44     /**
45      * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
46      * progress at different speeds, these may be completed out of order.
47      *
48      * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
49      *       figure out a more efficient lookup mechanism to deal with responses which do not match the queue
50      *       order.
51      */
52     private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
53     private final Ticker ticker;
54     private final Long cookie;
55
56     // Updated/consulted from actor context only
57     /**
58      * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
59      * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
60      * result.
61      */
62     private CompletionStage<? extends BackendInfo> backendProof;
63     private BackendInfo backend;
64
65     /**
66      * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
67      */
68     private Object expectingTimer;
69
70     private long lastProgress;
71
72     // Updated from application thread
73     private volatile boolean notClosed = true;
74
75     SequencedQueue(final Long cookie, final Ticker ticker) {
76         this.cookie = Preconditions.checkNotNull(cookie);
77         this.ticker = Preconditions.checkNotNull(ticker);
78         lastProgress = ticker.read();
79     }
80
81     Long getCookie() {
82         return cookie;
83     }
84
85     private void checkNotClosed() {
86         Preconditions.checkState(notClosed, "Queue %s is closed", this);
87     }
88
89     /**
90      * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
91      * the following scenarios:
92      * 1) The request has been enqueued and transmitted. No further actions are necessary
93      * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
94      * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
95      *    process needs to complete before transmission occurs
96      *
97      * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
98      * the scenarios above according to the following rules:
99      * - if is null, the first case applies
100      * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
101      *      resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
102      * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
103      *
104      * @param request Request to be sent
105      * @param callback Callback to be invoked
106      * @return Optional duration with semantics described above.
107      */
108     @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
109         final long now = ticker.read();
110         final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
111
112         // We could have check first, but argument checking needs to happen first
113         checkNotClosed();
114         queue.add(e);
115         LOG.debug("Enqueued request {} to queue {}", request, this);
116
117         if (backend == null) {
118             return Optional.empty();
119         }
120
121         e.retransmit(backend, now);
122         if (expectingTimer == null) {
123             expectingTimer = now + REQUEST_TIMEOUT_NANOS;
124             return Optional.of(INITIAL_REQUEST_TIMEOUT);
125         } else {
126             return null;
127         }
128     }
129
130     ClientActorBehavior complete(final ClientActorBehavior current, final Response<?, ?> response) {
131         // Responses to different targets may arrive out of order, hence we use an iterator
132         final Iterator<SequencedQueueEntry> it = queue.iterator();
133         while (it.hasNext()) {
134             final SequencedQueueEntry e = it.next();
135             if (e.acceptsResponse(response)) {
136                 lastProgress = ticker.read();
137                 it.remove();
138                 LOG.debug("Completing request {} with {}", e, response);
139                 return e.complete(response);
140             }
141         }
142
143         LOG.debug("No request matching {} found", response);
144         return current;
145     }
146
147     Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
148         if (!proof.equals(backendProof)) {
149             LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
150             return Optional.empty();
151         }
152
153         this.backend = Preconditions.checkNotNull(backend);
154         backendProof = null;
155         LOG.debug("Resolved backend {}",  backend);
156
157         if (queue.isEmpty()) {
158             // No pending requests, hence no need for a timer
159             return Optional.empty();
160         }
161
162         LOG.debug("Resending requests to backend {}", backend);
163         final long now = ticker.read();
164         for (SequencedQueueEntry e : queue) {
165             e.retransmit(backend, now);
166         }
167
168         if (expectingTimer != null) {
169             // We already have a timer going, no need to schedule a new one
170             return Optional.empty();
171         }
172
173         // Above loop may have cost us some time. Recalculate timeout.
174         final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
175         expectingTimer = nextTicks;
176         return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
177     }
178
179     boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
180         if (!proof.equals(backendProof)) {
181             LOG.debug("Setting resolution handle to {}", proof);
182             backendProof = proof;
183             return true;
184         } else {
185             LOG.trace("Already resolving handle {}", proof);
186             return false;
187         }
188     }
189
190     boolean hasCompleted() {
191         return !notClosed && queue.isEmpty();
192     }
193
194     /**
195      * Check queue timeouts and return true if a timeout has occured.
196      *
197      * @return True if a timeout occured
198      * @throws NoProgressException if the queue failed to make progress for an extended
199      *                             time.
200      */
201     boolean runTimeout() throws NoProgressException {
202         expectingTimer = null;
203         final long now = ticker.read();
204
205         if (!queue.isEmpty()) {
206             final long ticksSinceProgress = now - lastProgress;
207             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
208                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
209                     TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
210
211                 final NoProgressException ex = new NoProgressException(ticksSinceProgress);
212                 poison(ex);
213                 throw ex;
214             }
215         }
216
217         // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
218         final SequencedQueueEntry head = queue.peek();
219         if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
220             backend = null;
221             LOG.debug("Queue {} invalidated backend info", this);
222             return true;
223         } else {
224             return false;
225         }
226     }
227
228     void poison(final RequestException cause) {
229         close();
230
231         SequencedQueueEntry e = queue.poll();
232         while (e != null) {
233             e.poison(cause);
234             e = queue.poll();
235         }
236     }
237
238     // FIXME: add a caller from ClientSingleTransaction
239     void close() {
240         notClosed = false;
241     }
242 }