2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.actors.client;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Ticker;
13 import java.util.Deque;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Optional;
17 import java.util.concurrent.CompletionStage;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.Nullable;
20 import javax.annotation.concurrent.NotThreadSafe;
21 import org.opendaylight.controller.cluster.access.concepts.Request;
22 import org.opendaylight.controller.cluster.access.concepts.RequestException;
23 import org.opendaylight.controller.cluster.access.concepts.Response;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.duration.FiniteDuration;
29 * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
30 * retries and enqueues work as expected.
33 final class SequencedQueue {
34 private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
36 // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
38 static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
40 static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
41 private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
42 TimeUnit.NANOSECONDS);
45 * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
46 * progress at different speeds, these may be completed out of order.
48 * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
49 * figure out a more efficient lookup mechanism to deal with responses which do not match the queue
52 private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
53 private final Ticker ticker;
54 private final Long cookie;
56 // Updated/consulted from actor context only
58 * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
59 * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
62 private CompletionStage<? extends BackendInfo> backendProof;
63 private BackendInfo backend;
66 * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
68 private Object expectingTimer;
70 private long lastProgress;
72 // Updated from application thread
73 private volatile boolean notClosed = true;
75 SequencedQueue(final Long cookie, final Ticker ticker) {
76 this.cookie = Preconditions.checkNotNull(cookie);
77 this.ticker = Preconditions.checkNotNull(ticker);
78 lastProgress = ticker.read();
85 private void checkNotClosed() {
86 Preconditions.checkState(notClosed, "Queue %s is closed", this);
90 * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
91 * the following scenarios:
92 * 1) The request has been enqueued and transmitted. No further actions are necessary
93 * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
94 * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
95 * process needs to complete before transmission occurs
97 * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
98 * the scenarios above according to the following rules:
99 * - if is null, the first case applies
100 * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
101 * resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
102 * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
104 * @param request Request to be sent
105 * @param callback Callback to be invoked
106 * @return Optional duration with semantics described above.
108 @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
109 final long now = ticker.read();
110 final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
112 // We could have check first, but argument checking needs to happen first
115 LOG.debug("Enqueued request {} to queue {}", request, this);
117 if (backend == null) {
118 return Optional.empty();
121 e.retransmit(backend, now);
122 if (expectingTimer == null) {
123 expectingTimer = now + REQUEST_TIMEOUT_NANOS;
124 return Optional.of(INITIAL_REQUEST_TIMEOUT);
130 ClientActorBehavior complete(final ClientActorBehavior current, final Response<?, ?> response) {
131 // Responses to different targets may arrive out of order, hence we use an iterator
132 final Iterator<SequencedQueueEntry> it = queue.iterator();
133 while (it.hasNext()) {
134 final SequencedQueueEntry e = it.next();
135 if (e.acceptsResponse(response)) {
136 lastProgress = ticker.read();
138 LOG.debug("Completing request {} with {}", e, response);
139 return e.complete(response);
143 LOG.debug("No request matching {} found", response);
147 Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
148 if (!proof.equals(backendProof)) {
149 LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
150 return Optional.empty();
153 this.backend = Preconditions.checkNotNull(backend);
155 LOG.debug("Resolved backend {}", backend);
157 if (queue.isEmpty()) {
158 // No pending requests, hence no need for a timer
159 return Optional.empty();
162 LOG.debug("Resending requests to backend {}", backend);
163 final long now = ticker.read();
164 for (SequencedQueueEntry e : queue) {
165 e.retransmit(backend, now);
168 if (expectingTimer != null) {
169 // We already have a timer going, no need to schedule a new one
170 return Optional.empty();
173 // Above loop may have cost us some time. Recalculate timeout.
174 final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
175 expectingTimer = nextTicks;
176 return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
179 boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
180 if (!proof.equals(backendProof)) {
181 LOG.debug("Setting resolution handle to {}", proof);
182 backendProof = proof;
185 LOG.trace("Already resolving handle {}", proof);
190 boolean hasCompleted() {
191 return !notClosed && queue.isEmpty();
195 * Check queue timeouts and return true if a timeout has occured.
197 * @return True if a timeout occured
198 * @throws NoProgressException if the queue failed to make progress for an extended
201 boolean runTimeout() throws NoProgressException {
202 expectingTimer = null;
203 final long now = ticker.read();
205 if (!queue.isEmpty()) {
206 final long ticksSinceProgress = now - lastProgress;
207 if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
208 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
209 TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
211 final NoProgressException ex = new NoProgressException(ticksSinceProgress);
217 // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
218 final SequencedQueueEntry head = queue.peek();
219 if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
221 LOG.debug("Queue {} invalidated backend info", this);
228 void poison(final RequestException cause) {
231 SequencedQueueEntry e = queue.poll();
238 // FIXME: add a caller from ClientSingleTransaction