CONTROLLER-1641: Handle commit cohorts async
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
index c4afcfd4d803cf275894d270c9d2574681558bfe..0ef49b6244c33a8a86fb7adec36106117c1f6175 100644 (file)
@@ -13,22 +13,25 @@ import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.dispatch.ExecutionContexts;
 import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
 import akka.dispatch.Recover;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
@@ -36,7 +39,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
+import scala.compat.java8.FutureConverters;
 import scala.concurrent.Future;
 
 /**
@@ -98,16 +101,19 @@ class CompositeDataTreeCohort {
     private final DataTreeCohortActorRegistry registry;
     private final TransactionIdentifier txId;
     private final SchemaContext schema;
+    private final Executor callbackExecutor;
     private final Timeout timeout;
 
-    private List<Success> successfulFromPrevious;
+    @Nonnull
+    private List<Success> successfulFromPrevious = Collections.emptyList();
     private State state = State.IDLE;
 
     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
-        final SchemaContext schema, final Timeout timeout) {
+        final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
         this.registry = Preconditions.checkNotNull(registry);
         this.txId = Preconditions.checkNotNull(transactionID);
         this.schema = Preconditions.checkNotNull(schema);
+        this.callbackExecutor = Preconditions.checkNotNull(callbackExecutor);
         this.timeout = Preconditions.checkNotNull(timeout);
     }
 
@@ -129,11 +135,11 @@ class CompositeDataTreeCohort {
                 throw new IllegalStateException("Unhandled state " + state);
         }
 
-        successfulFromPrevious = null;
+        successfulFromPrevious = Collections.emptyList();
         state = State.IDLE;
     }
 
-    void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
+    Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("{}: canCommit - candidate: {}", txId, tip);
         } else {
@@ -143,9 +149,9 @@ class CompositeDataTreeCohort {
         final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
         LOG.debug("{}: canCommit - messages: {}", txId, messages);
         if (messages.isEmpty()) {
-            successfulFromPrevious = ImmutableList.of();
+            successfulFromPrevious = Collections.emptyList();
             changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
-            return;
+            return Optional.empty();
         }
 
         final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
@@ -158,43 +164,41 @@ class CompositeDataTreeCohort {
         }
 
         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
-        processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
+        return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
     }
 
-    void preCommit() throws ExecutionException, TimeoutException {
+    Optional<CompletionStage<Void>> preCommit() {
         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
 
-        Preconditions.checkState(successfulFromPrevious != null);
         if (successfulFromPrevious.isEmpty()) {
             changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
-            return;
+            return Optional.empty();
         }
 
         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
             new DataTreeCohortActor.PreCommit(txId));
         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
-        processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
+        return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
     }
 
-    void commit() throws ExecutionException, TimeoutException {
+    Optional<CompletionStage<Void>> commit() {
         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
         if (successfulFromPrevious.isEmpty()) {
             changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
-            return;
+            return Optional.empty();
         }
 
-        Preconditions.checkState(successfulFromPrevious != null);
         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
             new DataTreeCohortActor.Commit(txId));
         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
-        processResponses(futures, State.COMMIT_SENT, State.COMMITED);
+        return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
     }
 
-    Optional<List<Future<Object>>> abort() {
+    Optional<CompletionStage<?>> abort() {
         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
 
         state = State.ABORTED;
-        if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) {
+        if (successfulFromPrevious.isEmpty()) {
             return Optional.empty();
         }
 
@@ -203,7 +207,8 @@ class CompositeDataTreeCohort {
         for (Success s : successfulFromPrevious) {
             futures.add(Patterns.ask(s.getCohort(), message, timeout));
         }
-        return Optional.of(futures);
+
+        return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
     }
 
     private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
@@ -217,37 +222,38 @@ class CompositeDataTreeCohort {
         return ret;
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void processResponses(final List<Entry<ActorRef, Future<Object>>> futures, final State currentState,
-            final State afterState) throws TimeoutException, ExecutionException {
+    @Nonnull
+    private CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
+            final State currentState, final State afterState) {
         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+        final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
+        Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
+                ExecutionContexts.global());
 
-        final Iterable<Object> results;
-        try {
-            results = Await.result(Futures.sequence(Lists.transform(futures, Entry::getValue),
-                ExecutionContexts.global()), timeout.duration());
-        } catch (TimeoutException e) {
-            successfulFromPrevious = null;
-            LOG.debug("{}: processResponses - error from Future", txId, e);
-
-            for (Entry<ActorRef, Future<Object>> f : futures) {
-                if (!f.getValue().isCompleted()) {
-                    LOG.info("{}: actor {} failed to respond", txId, f.getKey());
-                }
+        aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
+            @Override
+            public void onComplete(Throwable failure, Iterable<Object> results) {
+                callbackExecutor.execute(
+                    () -> processResponses(failure, results, currentState, afterState, returnFuture));
             }
-            throw e;
-        } catch (ExecutionException e) {
-            successfulFromPrevious = null;
-            LOG.debug("{}: processResponses - error from Future", txId, e);
-            throw e;
-        } catch (Exception e) {
-            successfulFromPrevious = null;
-            LOG.debug("{}: processResponses - error from Future", txId, e);
-            throw new ExecutionException(e);
+        }, ExecutionContexts.global());
+
+        return returnFuture;
+    }
+
+    // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
+    // generic type is Void.
+    @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
+    private void processResponses(Throwable failure, Iterable<Object> results, State currentState, State afterState,
+            CompletableFuture<Void> resultFuture) {
+        if (failure != null) {
+            successfulFromPrevious = Collections.emptyList();
+            resultFuture.completeExceptionally(failure);
+            return;
         }
 
         final Collection<Failure> failed = new ArrayList<>(1);
-        final List<Success> successful = new ArrayList<>(futures.size());
+        final List<Success> successful = new ArrayList<>();
         for (Object result : results) {
             if (result instanceof DataTreeCohortActor.Success) {
                 successful.add((Success) result);
@@ -260,7 +266,6 @@ class CompositeDataTreeCohort {
 
         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
 
-        successfulFromPrevious = successful;
         if (!failed.isEmpty()) {
             changeStateFrom(currentState, State.FAILED);
             final Iterator<Failure> it = failed.iterator();
@@ -268,11 +273,14 @@ class CompositeDataTreeCohort {
             while (it.hasNext()) {
                 firstEx.addSuppressed(it.next().cause());
             }
-            Throwables.throwIfInstanceOf(firstEx, ExecutionException.class);
-            Throwables.throwIfInstanceOf(firstEx, TimeoutException.class);
-            throw new ExecutionException(firstEx);
+
+            successfulFromPrevious = Collections.emptyList();
+            resultFuture.completeExceptionally(firstEx);
+        } else {
+            successfulFromPrevious = successful;
+            changeStateFrom(currentState, afterState);
+            resultFuture.complete(null);
         }
-        changeStateFrom(currentState, afterState);
     }
 
     void changeStateFrom(final State expected, final State followup) {