/* * Copyright 2017-present Open Networking Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.atomix.utils.concurrent; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; /** * A {@link CompletableFuture} that ensures callbacks are called in FIFO order. *

* The default {@link CompletableFuture} does not guarantee the ordering of callbacks, and indeed appears to * execute them in LIFO order. */ public class OrderedFuture extends CompletableFuture { /** * Wraps the given future in a new blockable future. * * @param future the future to wrap * @param the future value type * @return a new blockable future */ public static CompletableFuture wrap(CompletableFuture future) { CompletableFuture newFuture = new OrderedFuture<>(); future.whenComplete((result, error) -> { if (error == null) { newFuture.complete(result); } else { newFuture.completeExceptionally(error); } }); return newFuture; } private static final ThreadContext NULL_CONTEXT = new NullThreadContext(); private final Queue> orderedFutures = new LinkedList<>(); private volatile boolean complete; private volatile T result; private volatile Throwable error; public OrderedFuture() { super.whenComplete(this::complete); } private ThreadContext getThreadContext() { ThreadContext context = ThreadContext.currentContext(); return context != null ? context : NULL_CONTEXT; } @Override public T get() throws InterruptedException, ExecutionException { ThreadContext context = getThreadContext(); context.block(); try { return super.get(); } finally { context.unblock(); } } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { ThreadContext context = getThreadContext(); context.block(); try { return super.get(timeout, unit); } finally { context.unblock(); } } @Override public synchronized T join() { ThreadContext context = getThreadContext(); context.block(); try { return super.join(); } finally { context.unblock(); } } /** * Adds a new ordered future. */ private CompletableFuture orderedFuture() { if (!complete) { synchronized (orderedFutures) { if (!complete) { CompletableFuture future = new CompletableFuture<>(); orderedFutures.add(future); return future; } } } // Completed if (error == null) { return CompletableFuture.completedFuture(result); } else { return Futures.exceptionalFuture(error); } } /** * Completes futures in FIFO order. */ private void complete(T result, Throwable error) { synchronized (orderedFutures) { this.result = result; this.error = error; this.complete = true; if (error == null) { for (CompletableFuture future : orderedFutures) { future.complete(result); } } else { for (CompletableFuture future : orderedFutures) { future.completeExceptionally(error); } } orderedFutures.clear(); } } @Override public CompletableFuture thenApply(Function fn) { return wrap(orderedFuture().thenApply(fn)); } @Override public CompletableFuture thenApplyAsync(Function fn) { return wrap(orderedFuture().thenApplyAsync(fn)); } @Override public CompletableFuture thenApplyAsync(Function fn, Executor executor) { return wrap(orderedFuture().thenApplyAsync(fn, executor)); } @Override public CompletableFuture thenAccept(Consumer action) { return wrap(orderedFuture().thenAccept(action)); } @Override public CompletableFuture thenAcceptAsync(Consumer action) { return wrap(orderedFuture().thenAcceptAsync(action)); } @Override public CompletableFuture thenAcceptAsync(Consumer action, Executor executor) { return wrap(orderedFuture().thenAcceptAsync(action, executor)); } @Override public CompletableFuture thenRun(Runnable action) { return wrap(orderedFuture().thenRun(action)); } @Override public CompletableFuture thenRunAsync(Runnable action) { return wrap(orderedFuture().thenRunAsync(action)); } @Override public CompletableFuture thenRunAsync(Runnable action, Executor executor) { return wrap(orderedFuture().thenRunAsync(action, executor)); } @Override public CompletableFuture thenCombine(CompletionStage other, BiFunction fn) { return wrap(orderedFuture().thenCombine(other, fn)); } @Override public CompletableFuture thenCombineAsync(CompletionStage other, BiFunction fn) { return wrap(orderedFuture().thenCombineAsync(other, fn)); } @Override public CompletableFuture thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor) { return wrap(orderedFuture().thenCombineAsync(other, fn, executor)); } @Override public CompletableFuture thenAcceptBoth(CompletionStage other, BiConsumer action) { return wrap(orderedFuture().thenAcceptBoth(other, action)); } @Override public CompletableFuture thenAcceptBothAsync(CompletionStage other, BiConsumer action) { return wrap(orderedFuture().thenAcceptBothAsync(other, action)); } @Override public CompletableFuture thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor) { return wrap(orderedFuture().thenAcceptBothAsync(other, action, executor)); } @Override public CompletableFuture runAfterBoth(CompletionStage other, Runnable action) { return wrap(orderedFuture().runAfterBoth(other, action)); } @Override public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action) { return wrap(orderedFuture().runAfterBothAsync(other, action)); } @Override public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { return wrap(orderedFuture().runAfterBothAsync(other, action, executor)); } @Override public CompletableFuture applyToEither(CompletionStage other, Function fn) { return wrap(orderedFuture().applyToEither(other, fn)); } @Override public CompletableFuture applyToEitherAsync(CompletionStage other, Function fn) { return wrap(orderedFuture().applyToEitherAsync(other, fn)); } @Override public CompletableFuture applyToEitherAsync(CompletionStage other, Function fn, Executor executor) { return wrap(orderedFuture().applyToEitherAsync(other, fn, executor)); } @Override public CompletableFuture acceptEither(CompletionStage other, Consumer action) { return wrap(orderedFuture().acceptEither(other, action)); } @Override public CompletableFuture acceptEitherAsync(CompletionStage other, Consumer action) { return wrap(orderedFuture().acceptEitherAsync(other, action)); } @Override public CompletableFuture acceptEitherAsync(CompletionStage other, Consumer action, Executor executor) { return wrap(orderedFuture().acceptEitherAsync(other, action, executor)); } @Override public CompletableFuture runAfterEither(CompletionStage other, Runnable action) { return wrap(orderedFuture().runAfterEither(other, action)); } @Override public CompletableFuture runAfterEitherAsync(CompletionStage other, Runnable action) { return wrap(orderedFuture().runAfterEitherAsync(other, action)); } @Override public CompletableFuture runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { return wrap(orderedFuture().runAfterEitherAsync(other, action, executor)); } @Override public CompletableFuture thenCompose(Function> fn) { return wrap(orderedFuture().thenCompose(fn)); } @Override public CompletableFuture thenComposeAsync(Function> fn) { return wrap(orderedFuture().thenComposeAsync(fn)); } @Override public CompletableFuture thenComposeAsync(Function> fn, Executor executor) { return wrap(orderedFuture().thenComposeAsync(fn, executor)); } @Override public CompletableFuture whenComplete(BiConsumer action) { return wrap(orderedFuture().whenComplete(action)); } @Override public CompletableFuture whenCompleteAsync(BiConsumer action) { return wrap(orderedFuture().whenCompleteAsync(action)); } @Override public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor) { return wrap(orderedFuture().whenCompleteAsync(action, executor)); } @Override public CompletableFuture handle(BiFunction fn) { return wrap(orderedFuture().handle(fn)); } @Override public CompletableFuture handleAsync(BiFunction fn) { return wrap(orderedFuture().handleAsync(fn)); } @Override public CompletableFuture handleAsync(BiFunction fn, Executor executor) { return wrap(orderedFuture().handleAsync(fn, executor)); } @Override public CompletableFuture exceptionally(Function fn) { return wrap(orderedFuture().exceptionally(fn)); } @Override public CompletableFuture toCompletableFuture() { return this; } }