Import atomix/{storage,utils}
[controller.git] / third-party / atomix / utils / src / main / java / io / atomix / utils / concurrent / AtomixFuture.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.concurrent;
17
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.CompletionStage;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.function.BiConsumer;
25 import java.util.function.BiFunction;
26 import java.util.function.Consumer;
27 import java.util.function.Function;
28
29 /**
30  * A {@link CompletableFuture} that tracks whether the future or one of its descendants has been blocked on
31  * a {@link CompletableFuture#get()} or {@link CompletableFuture#join()} call.
32  */
33 public class AtomixFuture<T> extends CompletableFuture<T> {
34
35   /**
36    * Wraps the given future in a new blockable future.
37    *
38    * @param future the future to wrap
39    * @param <T>    the future value type
40    * @return a new blockable future
41    */
42   public static <T> AtomixFuture<T> wrap(CompletableFuture<T> future) {
43     AtomixFuture<T> newFuture = new AtomixFuture<>();
44     future.whenComplete((result, error) -> {
45       if (error == null) {
46         newFuture.complete(result);
47       } else {
48         newFuture.completeExceptionally(error);
49       }
50     });
51     return newFuture;
52   }
53
54   /**
55    * Returns a new completed Atomix future.
56    *
57    * @param result the future result
58    * @param <T>    the future result type
59    * @return the completed future
60    */
61   public static <T> CompletableFuture<T> completedFuture(T result) {
62     CompletableFuture<T> future = new AtomixFuture<>();
63     future.complete(result);
64     return future;
65   }
66
67   /**
68    * Returns a new exceptionally completed Atomix future.
69    *
70    * @param t   the future exception
71    * @param <T> the future result type
72    * @return the completed future
73    */
74   public static <T> CompletableFuture<T> exceptionalFuture(Throwable t) {
75     CompletableFuture<T> future = new AtomixFuture<>();
76     future.completeExceptionally(t);
77     return future;
78   }
79
80   private static final ThreadContext NULL_CONTEXT = new NullThreadContext();
81
82   private ThreadContext getThreadContext() {
83     ThreadContext context = ThreadContext.currentContext();
84     return context != null ? context : NULL_CONTEXT;
85   }
86
87   @Override
88   public T get() throws InterruptedException, ExecutionException {
89     ThreadContext context = getThreadContext();
90     context.block();
91     try {
92       return super.get();
93     } finally {
94       context.unblock();
95     }
96   }
97
98   @Override
99   public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
100     ThreadContext context = getThreadContext();
101     context.block();
102     try {
103       return super.get(timeout, unit);
104     } finally {
105       context.unblock();
106     }
107   }
108
109   @Override
110   public synchronized T join() {
111     ThreadContext context = getThreadContext();
112     context.block();
113     try {
114       return super.join();
115     } finally {
116       context.unblock();
117     }
118   }
119
120   @Override
121   public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
122     return wrap(super.thenApply(fn));
123   }
124
125   @Override
126   public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
127     return wrap(super.thenApplyAsync(fn));
128   }
129
130   @Override
131   public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) {
132     return wrap(super.thenApplyAsync(fn, executor));
133   }
134
135   @Override
136   public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
137     return wrap(super.thenAccept(action));
138   }
139
140   @Override
141   public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
142     return wrap(super.thenAcceptAsync(action));
143   }
144
145   @Override
146   public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
147     return wrap(super.thenAcceptAsync(action, executor));
148   }
149
150   @Override
151   public CompletableFuture<Void> thenRun(Runnable action) {
152     return wrap(super.thenRun(action));
153   }
154
155   @Override
156   public CompletableFuture<Void> thenRunAsync(Runnable action) {
157     return wrap(super.thenRunAsync(action));
158   }
159
160   @Override
161   public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
162     return wrap(super.thenRunAsync(action, executor));
163   }
164
165   @Override
166   public <U, V> CompletableFuture<V> thenCombine(
167       CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) {
168     return wrap(super.thenCombine(other, fn));
169   }
170
171   @Override
172   public <U, V> CompletableFuture<V> thenCombineAsync(
173       CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) {
174     return wrap(super.thenCombineAsync(other, fn));
175   }
176
177   @Override
178   public <U, V> CompletableFuture<V> thenCombineAsync(
179       CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor) {
180     return wrap(super.thenCombineAsync(other, fn, executor));
181   }
182
183   @Override
184   public <U> CompletableFuture<Void> thenAcceptBoth(
185       CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) {
186     return wrap(super.thenAcceptBoth(other, action));
187   }
188
189   @Override
190   public <U> CompletableFuture<Void> thenAcceptBothAsync(
191       CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) {
192     return wrap(super.thenAcceptBothAsync(other, action));
193   }
194
195   @Override
196   public <U> CompletableFuture<Void> thenAcceptBothAsync(
197       CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) {
198     return wrap(super.thenAcceptBothAsync(other, action, executor));
199   }
200
201   @Override
202   public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
203     return wrap(super.runAfterBoth(other, action));
204   }
205
206   @Override
207   public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
208     return wrap(super.runAfterBothAsync(other, action));
209   }
210
211   @Override
212   public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
213     return wrap(super.runAfterBothAsync(other, action, executor));
214   }
215
216   @Override
217   public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {
218     return wrap(super.applyToEither(other, fn));
219   }
220
221   @Override
222   public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {
223     return wrap(super.applyToEitherAsync(other, fn));
224   }
225
226   @Override
227   public <U> CompletableFuture<U> applyToEitherAsync(
228       CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) {
229     return wrap(super.applyToEitherAsync(other, fn, executor));
230   }
231
232   @Override
233   public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
234     return wrap(super.acceptEither(other, action));
235   }
236
237   @Override
238   public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
239     return wrap(super.acceptEitherAsync(other, action));
240   }
241
242   @Override
243   public CompletableFuture<Void> acceptEitherAsync(
244       CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) {
245     return wrap(super.acceptEitherAsync(other, action, executor));
246   }
247
248   @Override
249   public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) {
250     return wrap(super.runAfterEither(other, action));
251   }
252
253   @Override
254   public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
255     return wrap(super.runAfterEitherAsync(other, action));
256   }
257
258   @Override
259   public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor) {
260     return wrap(super.runAfterEitherAsync(other, action, executor));
261   }
262
263   @Override
264   public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
265     return wrap(super.thenCompose(fn));
266   }
267
268   @Override
269   public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
270     return wrap(super.thenComposeAsync(fn));
271   }
272
273   @Override
274   public <U> CompletableFuture<U> thenComposeAsync(
275       Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) {
276     return wrap(super.thenComposeAsync(fn, executor));
277   }
278
279   @Override
280   public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
281     return wrap(super.whenComplete(action));
282   }
283
284   @Override
285   public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
286     return wrap(super.whenCompleteAsync(action));
287   }
288
289   @Override
290   public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
291     return wrap(super.whenCompleteAsync(action, executor));
292   }
293
294   @Override
295   public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
296     return wrap(super.handle(fn));
297   }
298
299   @Override
300   public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {
301     return wrap(super.handleAsync(fn));
302   }
303
304   @Override
305   public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
306     return wrap(super.handleAsync(fn, executor));
307   }
308 }