Move MessageTrackerTest
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / actors / client / SequencedQueue.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.actors.client;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Ticker;
13 import java.util.Deque;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Optional;
17 import java.util.concurrent.CompletionStage;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.Nullable;
20 import javax.annotation.concurrent.NotThreadSafe;
21 import org.opendaylight.controller.cluster.access.concepts.Request;
22 import org.opendaylight.controller.cluster.access.concepts.RequestException;
23 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.duration.FiniteDuration;
27
28 /*
29  * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
30  *       retries and enqueues work as expected.
31  */
32 @NotThreadSafe
33 final class SequencedQueue {
34     private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
35
36     // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
37     @VisibleForTesting
38     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
39     @VisibleForTesting
40     static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
41     private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
42         TimeUnit.NANOSECONDS);
43
44     /**
45      * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
46      * progress at different speeds, these may be completed out of order.
47      *
48      * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
49      *       figure out a more efficient lookup mechanism to deal with responses which do not match the queue
50      *       order.
51      */
52     private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
53     private final Ticker ticker;
54     private final Long cookie;
55
56     // Updated/consulted from actor context only
57     /**
58      * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
59      * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
60      * result.
61      */
62     private CompletionStage<? extends BackendInfo> backendProof;
63     private BackendInfo backend;
64
65     /**
66      * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
67      */
68     private Object expectingTimer;
69
70     private long lastProgress;
71
72     // Updated from application thread
73     private volatile boolean notClosed = true;
74
75     SequencedQueue(final Long cookie, final Ticker ticker) {
76         this.cookie = Preconditions.checkNotNull(cookie);
77         this.ticker = Preconditions.checkNotNull(ticker);
78         lastProgress = ticker.read();
79     }
80
81     Long getCookie() {
82         return cookie;
83     }
84
85     private void checkNotClosed() {
86         Preconditions.checkState(notClosed, "Queue %s is closed", this);
87     }
88
89     /**
90      * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
91      * the following scenarios:
92      * 1) The request has been enqueued and transmitted. No further actions are necessary
93      * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
94      * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
95      *    process needs to complete before transmission occurs
96      *
97      * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
98      * the scenarios above according to the following rules:
99      * - if is null, the first case applies
100      * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
101      *      resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
102      * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
103      *
104      * @param request Request to be sent
105      * @param callback Callback to be invoked
106      * @return Optional duration with semantics described above.
107      */
108     @Nullable Optional<FiniteDuration> enqueueRequest(final long sequence, final Request<?, ?> request,
109             final RequestCallback callback) {
110         checkNotClosed();
111
112         final long now = ticker.read();
113         final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now);
114
115         queue.add(e);
116         LOG.debug("Enqueued request {} to queue {}", request, this);
117
118         if (backend == null) {
119             return Optional.empty();
120         }
121
122         e.retransmit(backend, now);
123         if (expectingTimer == null) {
124             expectingTimer = now + REQUEST_TIMEOUT_NANOS;
125             return Optional.of(INITIAL_REQUEST_TIMEOUT);
126         } else {
127             return null;
128         }
129     }
130
131     ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
132         // Responses to different targets may arrive out of order, hence we use an iterator
133         final Iterator<SequencedQueueEntry> it = queue.iterator();
134         while (it.hasNext()) {
135             final SequencedQueueEntry e = it.next();
136             if (e.acceptsResponse(response)) {
137                 lastProgress = ticker.read();
138                 it.remove();
139                 LOG.debug("Completing request {} with {}", e, response);
140                 return e.complete(response.getMessage());
141             }
142         }
143
144         LOG.debug("No request matching {} found", response);
145         return current;
146     }
147
148     Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
149         if (!proof.equals(backendProof)) {
150             LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
151             return Optional.empty();
152         }
153
154         this.backend = Preconditions.checkNotNull(backend);
155         backendProof = null;
156         LOG.debug("Resolved backend {}",  backend);
157
158         if (queue.isEmpty()) {
159             // No pending requests, hence no need for a timer
160             return Optional.empty();
161         }
162
163         LOG.debug("Resending requests to backend {}", backend);
164         final long now = ticker.read();
165         for (SequencedQueueEntry e : queue) {
166             e.retransmit(backend, now);
167         }
168
169         if (expectingTimer != null) {
170             // We already have a timer going, no need to schedule a new one
171             return Optional.empty();
172         }
173
174         // Above loop may have cost us some time. Recalculate timeout.
175         final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
176         expectingTimer = nextTicks;
177         return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
178     }
179
180     boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
181         if (!proof.equals(backendProof)) {
182             LOG.debug("Setting resolution handle to {}", proof);
183             backendProof = proof;
184             return true;
185         } else {
186             LOG.trace("Already resolving handle {}", proof);
187             return false;
188         }
189     }
190
191     boolean hasCompleted() {
192         return !notClosed && queue.isEmpty();
193     }
194
195     /**
196      * Check queue timeouts and return true if a timeout has occured.
197      *
198      * @return True if a timeout occured
199      * @throws NoProgressException if the queue failed to make progress for an extended
200      *                             time.
201      */
202     boolean runTimeout() throws NoProgressException {
203         expectingTimer = null;
204         final long now = ticker.read();
205
206         if (!queue.isEmpty()) {
207             final long ticksSinceProgress = now - lastProgress;
208             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
209                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
210                     TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
211
212                 final NoProgressException ex = new NoProgressException(ticksSinceProgress);
213                 poison(ex);
214                 throw ex;
215             }
216         }
217
218         // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
219         final SequencedQueueEntry head = queue.peek();
220         if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
221             backend = null;
222             LOG.debug("Queue {} invalidated backend info", this);
223             return true;
224         } else {
225             return false;
226         }
227     }
228
229     void poison(final RequestException cause) {
230         close();
231
232         SequencedQueueEntry e = queue.poll();
233         while (e != null) {
234             e.poison(cause);
235             e = queue.poll();
236         }
237     }
238
239     // FIXME: add a caller from ClientSingleTransaction
240     void close() {
241         notClosed = false;
242     }
243 }