2 * Copyright 2015-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.utils.concurrent;
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.function.BinaryOperator;
26 import java.util.stream.Collectors;
27 import java.util.stream.Stream;
30 * Utilities for creating completed and exceptional futures.
32 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
34 public final class Futures {
37 * Gets a future result with a default timeout.
39 * @param future the future to block
40 * @param <T> the future result type
41 * @return the future result
42 * @throws RuntimeException if a future exception occurs
44 public static <T> T get(Future<T> future) {
45 return get(future, 30, TimeUnit.SECONDS);
49 * Gets a future result with a default timeout.
51 * @param future the future to block
52 * @param timeout the future timeout
53 * @param timeUnit the future timeout time unit
54 * @param <T> the future result type
55 * @return the future result
56 * @throws RuntimeException if a future exception occurs
58 public static <T> T get(Future<T> future, long timeout, TimeUnit timeUnit) {
60 return future.get(timeout, timeUnit);
61 } catch (InterruptedException | ExecutionException | TimeoutException e) {
62 throw new RuntimeException(e);
67 * Creates a future that is synchronously completed.
69 * @param result The future result.
70 * @return The completed future.
72 public static <T> CompletableFuture<T> completedFuture(T result) {
73 return CompletableFuture.completedFuture(result);
77 * Creates a future that is asynchronously completed.
79 * @param result The future result.
80 * @param executor The executor on which to complete the future.
81 * @return The completed future.
83 public static <T> CompletableFuture<T> completedFutureAsync(T result, Executor executor) {
84 CompletableFuture<T> future = new CompletableFuture<>();
85 executor.execute(() -> future.complete(result));
90 * Creates a future that is synchronously completed exceptionally.
92 * @param t The future exception.
93 * @return The exceptionally completed future.
95 public static <T> CompletableFuture<T> exceptionalFuture(Throwable t) {
96 CompletableFuture<T> future = new CompletableFuture<>();
97 future.completeExceptionally(t);
102 * Creates a future that is asynchronously completed exceptionally.
104 * @param t The future exception.
105 * @param executor The executor on which to complete the future.
106 * @return The exceptionally completed future.
108 public static <T> CompletableFuture<T> exceptionalFutureAsync(Throwable t, Executor executor) {
109 CompletableFuture<T> future = new CompletableFuture<>();
110 executor.execute(() -> {
111 future.completeExceptionally(t);
117 * Returns a future that completes callbacks in add order.
119 * @param <T> future value type
120 * @return a new completable future that will complete added callbacks in the order in which they were added
122 public static <T> CompletableFuture<T> orderedFuture() {
123 return new OrderedFuture<>();
127 * Returns a future that completes callbacks in add order.
129 * @param <T> future value type
130 * @return a new completable future that will complete added callbacks in the order in which they were added
132 public static <T> CompletableFuture<T> orderedFuture(CompletableFuture<T> future) {
133 CompletableFuture<T> newFuture = new OrderedFuture<>();
134 future.whenComplete((r, e) -> {
136 newFuture.complete(r);
138 newFuture.completeExceptionally(e);
145 * Returns a wrapped future that will be completed on the given executor.
147 * @param future the future to be completed on the given executor
148 * @param executor the executor with which to complete the future
149 * @param <T> the future value type
150 * @return a wrapped future to be completed on the given executor
152 public static <T> CompletableFuture<T> asyncFuture(CompletableFuture<T> future, Executor executor) {
153 CompletableFuture<T> newFuture = new AtomixFuture<>();
154 future.whenComplete((result, error) -> {
155 executor.execute(() -> {
157 newFuture.complete(result);
159 newFuture.completeExceptionally(error);
167 * Returns a new CompletableFuture completed with a list of computed values
168 * when all of the given CompletableFuture complete.
170 * @param futures the CompletableFutures
171 * @param <T> value type of CompletableFuture
172 * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
174 @SuppressWarnings("unchecked")
175 public static <T> CompletableFuture<Stream<T>> allOf(Stream<CompletableFuture<T>> futures) {
176 CompletableFuture<T>[] futuresArray = futures.toArray(CompletableFuture[]::new);
177 return AtomixFuture.wrap(CompletableFuture.allOf(futuresArray)
178 .thenApply(v -> Stream.of(futuresArray).map(CompletableFuture::join)));
182 * Returns a new CompletableFuture completed with a list of computed values
183 * when all of the given CompletableFuture complete.
185 * @param futures the CompletableFutures
186 * @param <T> value type of CompletableFuture
187 * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
189 public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
190 return AtomixFuture.wrap(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
191 .thenApply(v -> futures.stream()
192 .map(CompletableFuture::join)
193 .collect(Collectors.toList())));
197 * Returns a new CompletableFuture completed by reducing a list of computed values
198 * when all of the given CompletableFuture complete.
200 * @param futures the CompletableFutures
201 * @param reducer reducer for computing the result
202 * @param emptyValue zero value to be returned if the input future list is empty
203 * @param <T> value type of CompletableFuture
204 * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
206 public static <T> CompletableFuture<T> allOf(
207 List<CompletableFuture<T>> futures, BinaryOperator<T> reducer, T emptyValue) {
208 return allOf(futures).thenApply(resultList -> resultList.stream().reduce(reducer).orElse(emptyValue));