Import atomix/{storage,utils}
[controller.git] / third-party / atomix / utils / src / main / java / io / atomix / utils / concurrent / Futures.java
1 /*
2  * Copyright 2015-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.concurrent;
17
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.function.BinaryOperator;
26 import java.util.stream.Collectors;
27 import java.util.stream.Stream;
28
29 /**
30  * Utilities for creating completed and exceptional futures.
31  *
32  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
33  */
34 public final class Futures {
35
36   /**
37    * Gets a future result with a default timeout.
38    *
39    * @param future the future to block
40    * @param <T> the future result type
41    * @return the future result
42    * @throws RuntimeException if a future exception occurs
43    */
44   public static <T> T get(Future<T> future) {
45     return get(future, 30, TimeUnit.SECONDS);
46   }
47
48   /**
49    * Gets a future result with a default timeout.
50    *
51    * @param future the future to block
52    * @param timeout the future timeout
53    * @param timeUnit the future timeout time unit
54    * @param <T> the future result type
55    * @return the future result
56    * @throws RuntimeException if a future exception occurs
57    */
58   public static <T> T get(Future<T> future, long timeout, TimeUnit timeUnit) {
59     try {
60       return future.get(timeout, timeUnit);
61     } catch (InterruptedException | ExecutionException | TimeoutException e) {
62       throw new RuntimeException(e);
63     }
64   }
65
66   /**
67    * Creates a future that is synchronously completed.
68    *
69    * @param result The future result.
70    * @return The completed future.
71    */
72   public static <T> CompletableFuture<T> completedFuture(T result) {
73     return CompletableFuture.completedFuture(result);
74   }
75
76   /**
77    * Creates a future that is asynchronously completed.
78    *
79    * @param result   The future result.
80    * @param executor The executor on which to complete the future.
81    * @return The completed future.
82    */
83   public static <T> CompletableFuture<T> completedFutureAsync(T result, Executor executor) {
84     CompletableFuture<T> future = new CompletableFuture<>();
85     executor.execute(() -> future.complete(result));
86     return future;
87   }
88
89   /**
90    * Creates a future that is synchronously completed exceptionally.
91    *
92    * @param t The future exception.
93    * @return The exceptionally completed future.
94    */
95   public static <T> CompletableFuture<T> exceptionalFuture(Throwable t) {
96     CompletableFuture<T> future = new CompletableFuture<>();
97     future.completeExceptionally(t);
98     return future;
99   }
100
101   /**
102    * Creates a future that is asynchronously completed exceptionally.
103    *
104    * @param t        The future exception.
105    * @param executor The executor on which to complete the future.
106    * @return The exceptionally completed future.
107    */
108   public static <T> CompletableFuture<T> exceptionalFutureAsync(Throwable t, Executor executor) {
109     CompletableFuture<T> future = new CompletableFuture<>();
110     executor.execute(() -> {
111       future.completeExceptionally(t);
112     });
113     return future;
114   }
115
116   /**
117    * Returns a future that completes callbacks in add order.
118    *
119    * @param <T> future value type
120    * @return a new completable future that will complete added callbacks in the order in which they were added
121    */
122   public static <T> CompletableFuture<T> orderedFuture() {
123     return new OrderedFuture<>();
124   }
125
126   /**
127    * Returns a future that completes callbacks in add order.
128    *
129    * @param <T> future value type
130    * @return a new completable future that will complete added callbacks in the order in which they were added
131    */
132   public static <T> CompletableFuture<T> orderedFuture(CompletableFuture<T> future) {
133     CompletableFuture<T> newFuture = new OrderedFuture<>();
134     future.whenComplete((r, e) -> {
135       if (e == null) {
136         newFuture.complete(r);
137       } else {
138         newFuture.completeExceptionally(e);
139       }
140     });
141     return newFuture;
142   }
143
144   /**
145    * Returns a wrapped future that will be completed on the given executor.
146    *
147    * @param future   the future to be completed on the given executor
148    * @param executor the executor with which to complete the future
149    * @param <T>      the future value type
150    * @return a wrapped future to be completed on the given executor
151    */
152   public static <T> CompletableFuture<T> asyncFuture(CompletableFuture<T> future, Executor executor) {
153     CompletableFuture<T> newFuture = new AtomixFuture<>();
154     future.whenComplete((result, error) -> {
155       executor.execute(() -> {
156         if (error == null) {
157           newFuture.complete(result);
158         } else {
159           newFuture.completeExceptionally(error);
160         }
161       });
162     });
163     return newFuture;
164   }
165
166   /**
167    * Returns a new CompletableFuture completed with a list of computed values
168    * when all of the given CompletableFuture complete.
169    *
170    * @param futures the CompletableFutures
171    * @param <T>     value type of CompletableFuture
172    * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
173    */
174   @SuppressWarnings("unchecked")
175   public static <T> CompletableFuture<Stream<T>> allOf(Stream<CompletableFuture<T>> futures) {
176     CompletableFuture<T>[] futuresArray = futures.toArray(CompletableFuture[]::new);
177     return AtomixFuture.wrap(CompletableFuture.allOf(futuresArray)
178         .thenApply(v -> Stream.of(futuresArray).map(CompletableFuture::join)));
179   }
180
181   /**
182    * Returns a new CompletableFuture completed with a list of computed values
183    * when all of the given CompletableFuture complete.
184    *
185    * @param futures the CompletableFutures
186    * @param <T>     value type of CompletableFuture
187    * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
188    */
189   public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
190     return AtomixFuture.wrap(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
191         .thenApply(v -> futures.stream()
192             .map(CompletableFuture::join)
193             .collect(Collectors.toList())));
194   }
195
196   /**
197    * Returns a new CompletableFuture completed by reducing a list of computed values
198    * when all of the given CompletableFuture complete.
199    *
200    * @param futures    the CompletableFutures
201    * @param reducer    reducer for computing the result
202    * @param emptyValue zero value to be returned if the input future list is empty
203    * @param <T>        value type of CompletableFuture
204    * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
205    */
206   public static <T> CompletableFuture<T> allOf(
207       List<CompletableFuture<T>> futures, BinaryOperator<T> reducer, T emptyValue) {
208     return allOf(futures).thenApply(resultList -> resultList.stream().reduce(reducer).orElse(emptyValue));
209   }
210
211 }