2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.utils.concurrent;
18 import java.util.LinkedList;
19 import java.util.Queue;
20 import java.util.concurrent.CompletableFuture;
21 import java.util.concurrent.CompletionStage;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.function.BiConsumer;
27 import java.util.function.BiFunction;
28 import java.util.function.Consumer;
29 import java.util.function.Function;
32 * A {@link CompletableFuture} that ensures callbacks are called in FIFO order.
34 * The default {@link CompletableFuture} does not guarantee the ordering of callbacks, and indeed appears to
35 * execute them in LIFO order.
37 public class OrderedFuture<T> extends CompletableFuture<T> {
40 * Wraps the given future in a new blockable future.
42 * @param future the future to wrap
43 * @param <T> the future value type
44 * @return a new blockable future
46 public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
47 CompletableFuture<T> newFuture = new OrderedFuture<>();
48 future.whenComplete((result, error) -> {
50 newFuture.complete(result);
52 newFuture.completeExceptionally(error);
58 private static final ThreadContext NULL_CONTEXT = new NullThreadContext();
60 private final Queue<CompletableFuture<T>> orderedFutures = new LinkedList<>();
61 private volatile boolean complete;
62 private volatile T result;
63 private volatile Throwable error;
65 public OrderedFuture() {
66 super.whenComplete(this::complete);
69 private ThreadContext getThreadContext() {
70 ThreadContext context = ThreadContext.currentContext();
71 return context != null ? context : NULL_CONTEXT;
75 public T get() throws InterruptedException, ExecutionException {
76 ThreadContext context = getThreadContext();
86 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
87 ThreadContext context = getThreadContext();
90 return super.get(timeout, unit);
97 public synchronized T join() {
98 ThreadContext context = getThreadContext();
108 * Adds a new ordered future.
110 private CompletableFuture<T> orderedFuture() {
112 synchronized (orderedFutures) {
114 CompletableFuture<T> future = new CompletableFuture<>();
115 orderedFutures.add(future);
123 return CompletableFuture.completedFuture(result);
125 return Futures.exceptionalFuture(error);
130 * Completes futures in FIFO order.
132 private void complete(T result, Throwable error) {
133 synchronized (orderedFutures) {
134 this.result = result;
136 this.complete = true;
138 for (CompletableFuture<T> future : orderedFutures) {
139 future.complete(result);
142 for (CompletableFuture<T> future : orderedFutures) {
143 future.completeExceptionally(error);
146 orderedFutures.clear();
151 public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
152 return wrap(orderedFuture().thenApply(fn));
156 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
157 return wrap(orderedFuture().thenApplyAsync(fn));
161 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) {
162 return wrap(orderedFuture().thenApplyAsync(fn, executor));
166 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
167 return wrap(orderedFuture().thenAccept(action));
171 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
172 return wrap(orderedFuture().thenAcceptAsync(action));
176 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
177 return wrap(orderedFuture().thenAcceptAsync(action, executor));
181 public CompletableFuture<Void> thenRun(Runnable action) {
182 return wrap(orderedFuture().thenRun(action));
186 public CompletableFuture<Void> thenRunAsync(Runnable action) {
187 return wrap(orderedFuture().thenRunAsync(action));
191 public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
192 return wrap(orderedFuture().thenRunAsync(action, executor));
196 public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) {
197 return wrap(orderedFuture().thenCombine(other, fn));
201 public <U, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) {
202 return wrap(orderedFuture().thenCombineAsync(other, fn));
206 public <U, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor) {
207 return wrap(orderedFuture().thenCombineAsync(other, fn, executor));
211 public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) {
212 return wrap(orderedFuture().thenAcceptBoth(other, action));
216 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) {
217 return wrap(orderedFuture().thenAcceptBothAsync(other, action));
221 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) {
222 return wrap(orderedFuture().thenAcceptBothAsync(other, action, executor));
226 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
227 return wrap(orderedFuture().runAfterBoth(other, action));
231 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
232 return wrap(orderedFuture().runAfterBothAsync(other, action));
236 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
237 return wrap(orderedFuture().runAfterBothAsync(other, action, executor));
241 public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {
242 return wrap(orderedFuture().applyToEither(other, fn));
246 public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {
247 return wrap(orderedFuture().applyToEitherAsync(other, fn));
251 public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) {
252 return wrap(orderedFuture().applyToEitherAsync(other, fn, executor));
256 public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
257 return wrap(orderedFuture().acceptEither(other, action));
261 public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
262 return wrap(orderedFuture().acceptEitherAsync(other, action));
266 public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) {
267 return wrap(orderedFuture().acceptEitherAsync(other, action, executor));
271 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) {
272 return wrap(orderedFuture().runAfterEither(other, action));
276 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
277 return wrap(orderedFuture().runAfterEitherAsync(other, action));
281 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor) {
282 return wrap(orderedFuture().runAfterEitherAsync(other, action, executor));
286 public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
287 return wrap(orderedFuture().thenCompose(fn));
291 public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
292 return wrap(orderedFuture().thenComposeAsync(fn));
296 public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) {
297 return wrap(orderedFuture().thenComposeAsync(fn, executor));
301 public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
302 return wrap(orderedFuture().whenComplete(action));
306 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
307 return wrap(orderedFuture().whenCompleteAsync(action));
311 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
312 return wrap(orderedFuture().whenCompleteAsync(action, executor));
316 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
317 return wrap(orderedFuture().handle(fn));
321 public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {
322 return wrap(orderedFuture().handleAsync(fn));
326 public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
327 return wrap(orderedFuture().handleAsync(fn, executor));
331 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
332 return wrap(orderedFuture().exceptionally(fn));
336 public CompletableFuture<T> toCompletableFuture() {