Import atomix/{storage,utils}
[controller.git] / third-party / atomix / utils / src / main / java / io / atomix / utils / concurrent / OrderedFuture.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.concurrent;
17
18 import java.util.LinkedList;
19 import java.util.Queue;
20 import java.util.concurrent.CompletableFuture;
21 import java.util.concurrent.CompletionStage;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.function.BiConsumer;
27 import java.util.function.BiFunction;
28 import java.util.function.Consumer;
29 import java.util.function.Function;
30
31 /**
32  * A {@link CompletableFuture} that ensures callbacks are called in FIFO order.
33  * <p>
34  * The default {@link CompletableFuture} does not guarantee the ordering of callbacks, and indeed appears to
35  * execute them in LIFO order.
36  */
37 public class OrderedFuture<T> extends CompletableFuture<T> {
38
39   /**
40    * Wraps the given future in a new blockable future.
41    *
42    * @param future the future to wrap
43    * @param <T>    the future value type
44    * @return a new blockable future
45    */
46   public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
47     CompletableFuture<T> newFuture = new OrderedFuture<>();
48     future.whenComplete((result, error) -> {
49       if (error == null) {
50         newFuture.complete(result);
51       } else {
52         newFuture.completeExceptionally(error);
53       }
54     });
55     return newFuture;
56   }
57
58   private static final ThreadContext NULL_CONTEXT = new NullThreadContext();
59
60   private final Queue<CompletableFuture<T>> orderedFutures = new LinkedList<>();
61   private volatile boolean complete;
62   private volatile T result;
63   private volatile Throwable error;
64
65   public OrderedFuture() {
66     super.whenComplete(this::complete);
67   }
68
69   private ThreadContext getThreadContext() {
70     ThreadContext context = ThreadContext.currentContext();
71     return context != null ? context : NULL_CONTEXT;
72   }
73
74   @Override
75   public T get() throws InterruptedException, ExecutionException {
76     ThreadContext context = getThreadContext();
77     context.block();
78     try {
79       return super.get();
80     } finally {
81       context.unblock();
82     }
83   }
84
85   @Override
86   public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
87     ThreadContext context = getThreadContext();
88     context.block();
89     try {
90       return super.get(timeout, unit);
91     } finally {
92       context.unblock();
93     }
94   }
95
96   @Override
97   public synchronized T join() {
98     ThreadContext context = getThreadContext();
99     context.block();
100     try {
101       return super.join();
102     } finally {
103       context.unblock();
104     }
105   }
106
107   /**
108    * Adds a new ordered future.
109    */
110   private CompletableFuture<T> orderedFuture() {
111     if (!complete) {
112       synchronized (orderedFutures) {
113         if (!complete) {
114           CompletableFuture<T> future = new CompletableFuture<>();
115           orderedFutures.add(future);
116           return future;
117         }
118       }
119     }
120
121     // Completed
122     if (error == null) {
123       return CompletableFuture.completedFuture(result);
124     } else {
125       return Futures.exceptionalFuture(error);
126     }
127   }
128
129   /**
130    * Completes futures in FIFO order.
131    */
132   private void complete(T result, Throwable error) {
133     synchronized (orderedFutures) {
134       this.result = result;
135       this.error = error;
136       this.complete = true;
137       if (error == null) {
138         for (CompletableFuture<T> future : orderedFutures) {
139           future.complete(result);
140         }
141       } else {
142         for (CompletableFuture<T> future : orderedFutures) {
143           future.completeExceptionally(error);
144         }
145       }
146       orderedFutures.clear();
147     }
148   }
149
150   @Override
151   public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
152     return wrap(orderedFuture().thenApply(fn));
153   }
154
155   @Override
156   public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
157     return wrap(orderedFuture().thenApplyAsync(fn));
158   }
159
160   @Override
161   public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) {
162     return wrap(orderedFuture().thenApplyAsync(fn, executor));
163   }
164
165   @Override
166   public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
167     return wrap(orderedFuture().thenAccept(action));
168   }
169
170   @Override
171   public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
172     return wrap(orderedFuture().thenAcceptAsync(action));
173   }
174
175   @Override
176   public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
177     return wrap(orderedFuture().thenAcceptAsync(action, executor));
178   }
179
180   @Override
181   public CompletableFuture<Void> thenRun(Runnable action) {
182     return wrap(orderedFuture().thenRun(action));
183   }
184
185   @Override
186   public CompletableFuture<Void> thenRunAsync(Runnable action) {
187     return wrap(orderedFuture().thenRunAsync(action));
188   }
189
190   @Override
191   public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
192     return wrap(orderedFuture().thenRunAsync(action, executor));
193   }
194
195   @Override
196   public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) {
197     return wrap(orderedFuture().thenCombine(other, fn));
198   }
199
200   @Override
201   public <U, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) {
202     return wrap(orderedFuture().thenCombineAsync(other, fn));
203   }
204
205   @Override
206   public <U, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor) {
207     return wrap(orderedFuture().thenCombineAsync(other, fn, executor));
208   }
209
210   @Override
211   public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) {
212     return wrap(orderedFuture().thenAcceptBoth(other, action));
213   }
214
215   @Override
216   public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) {
217     return wrap(orderedFuture().thenAcceptBothAsync(other, action));
218   }
219
220   @Override
221   public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) {
222     return wrap(orderedFuture().thenAcceptBothAsync(other, action, executor));
223   }
224
225   @Override
226   public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
227     return wrap(orderedFuture().runAfterBoth(other, action));
228   }
229
230   @Override
231   public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
232     return wrap(orderedFuture().runAfterBothAsync(other, action));
233   }
234
235   @Override
236   public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
237     return wrap(orderedFuture().runAfterBothAsync(other, action, executor));
238   }
239
240   @Override
241   public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {
242     return wrap(orderedFuture().applyToEither(other, fn));
243   }
244
245   @Override
246   public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {
247     return wrap(orderedFuture().applyToEitherAsync(other, fn));
248   }
249
250   @Override
251   public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) {
252     return wrap(orderedFuture().applyToEitherAsync(other, fn, executor));
253   }
254
255   @Override
256   public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
257     return wrap(orderedFuture().acceptEither(other, action));
258   }
259
260   @Override
261   public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
262     return wrap(orderedFuture().acceptEitherAsync(other, action));
263   }
264
265   @Override
266   public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) {
267     return wrap(orderedFuture().acceptEitherAsync(other, action, executor));
268   }
269
270   @Override
271   public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) {
272     return wrap(orderedFuture().runAfterEither(other, action));
273   }
274
275   @Override
276   public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
277     return wrap(orderedFuture().runAfterEitherAsync(other, action));
278   }
279
280   @Override
281   public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor) {
282     return wrap(orderedFuture().runAfterEitherAsync(other, action, executor));
283   }
284
285   @Override
286   public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
287     return wrap(orderedFuture().thenCompose(fn));
288   }
289
290   @Override
291   public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
292     return wrap(orderedFuture().thenComposeAsync(fn));
293   }
294
295   @Override
296   public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) {
297     return wrap(orderedFuture().thenComposeAsync(fn, executor));
298   }
299
300   @Override
301   public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
302     return wrap(orderedFuture().whenComplete(action));
303   }
304
305   @Override
306   public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
307     return wrap(orderedFuture().whenCompleteAsync(action));
308   }
309
310   @Override
311   public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
312     return wrap(orderedFuture().whenCompleteAsync(action, executor));
313   }
314
315   @Override
316   public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
317     return wrap(orderedFuture().handle(fn));
318   }
319
320   @Override
321   public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {
322     return wrap(orderedFuture().handleAsync(fn));
323   }
324
325   @Override
326   public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
327     return wrap(orderedFuture().handleAsync(fn, executor));
328   }
329
330   @Override
331   public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
332     return wrap(orderedFuture().exceptionally(fn));
333   }
334
335   @Override
336   public CompletableFuture<T> toCompletableFuture() {
337     return this;
338   }
339 }