+
+ private static <R extends OwnerSupervisorReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
+ final CompletionStage<R> stage, final Function<R, O> outputFunction) {
+
+ final SettableFuture<RpcResult<O>> future = SettableFuture.create();
+ stage.whenComplete((reply, failure) -> {
+ if (failure != null) {
+ future.setException(failure);
+ } else {
+ future.set(RpcResultBuilder.success(outputFunction.apply(reply)).build());
+ }
+ });
+ return future;
+ }
+
+ private static ListenableFuture<Empty> toListenableFuture(final String op, final CompletionStage<?> stage) {
+ final SettableFuture<Empty> future = SettableFuture.create();
+ stage.whenComplete((reply, failure) -> {
+ if (failure != null) {
+ LOG.warn("{} DataCenter failed", op, failure);
+ future.setException(failure);
+ } else {
+ LOG.debug("{} DataCenter successful", op);
+ future.set(Empty.getInstance());
+ }
+ });
+ return future;
+ }