Bug 8424: Don't output data tree and tree candidates wih debug
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
index 05b9981113a039a75b3679698d9ddd61e8d97318..338ae645345bc8b4e3237965260e67590ad811da 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.dispatch.ExecutionContexts;
@@ -17,9 +18,14 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -28,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanComm
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
@@ -39,6 +47,7 @@ import scala.concurrent.Future;
  *
  */
 class CompositeDataTreeCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
 
     private enum State {
         /**
@@ -70,7 +79,7 @@ class CompositeDataTreeCohort {
          */
         COMMITED,
         /**
-         * Some of cohorts responsed back with unsuccessful message.
+         * Some of cohorts responded back with unsuccessful message.
          */
         FAILED,
         /**
@@ -79,19 +88,19 @@ class CompositeDataTreeCohort {
         ABORTED
     }
 
-    protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
+    static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
         @Override
-        public Failure recover(final Throwable error) throws Throwable {
+        public Failure recover(final Throwable error) {
             return new Failure(error);
         }
     };
 
-
     private final DataTreeCohortActorRegistry registry;
     private final TransactionIdentifier txId;
     private final SchemaContext schema;
     private final Timeout timeout;
-    private Iterable<Success> successfulFromPrevious;
+
+    private List<Success> successfulFromPrevious;
     private State state = State.IDLE;
 
     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
@@ -102,67 +111,166 @@ class CompositeDataTreeCohort {
         this.timeout = Preconditions.checkNotNull(timeout);
     }
 
+    void reset() {
+        switch (state) {
+            case CAN_COMMIT_SENT:
+            case CAN_COMMIT_SUCCESSFUL:
+            case PRE_COMMIT_SENT:
+            case PRE_COMMIT_SUCCESSFUL:
+            case COMMIT_SENT:
+                abort();
+                break;
+            case ABORTED:
+            case COMMITED:
+            case FAILED:
+            case IDLE:
+                break;
+            default:
+                throw new IllegalStateException("Unhandled state " + state);
+        }
+
+        successfulFromPrevious = null;
+        state = State.IDLE;
+    }
+
     void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
-        Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
-        // FIXME: Optimize empty collection list with pre-created futures, containing success.
-        Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
-            input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
-                    ExecutionContexts.global()), ExecutionContexts.global());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("{}: canCommit - candidate: {}", txId, tip);
+        } else {
+            LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
+        }
+
+        final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+        LOG.debug("{}: canCommit - messages: {}", txId, messages);
+        if (messages.isEmpty()) {
+            successfulFromPrevious = ImmutableList.of();
+            changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
+            return;
+        }
+
+        final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
+        for (CanCommit message : messages) {
+            final ActorRef actor = message.getCohort();
+            final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
+                ExecutionContexts.global());
+            LOG.trace("{}: requesting canCommit from {}", txId, actor);
+            futures.add(new SimpleImmutableEntry<>(actor, future));
+        }
+
         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
-        processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
+        processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
     }
 
     void preCommit() throws ExecutionException, TimeoutException {
+        LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
         Preconditions.checkState(successfulFromPrevious != null);
-        Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
+        if (successfulFromPrevious.isEmpty()) {
+            changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
+            return;
+        }
+
+        final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
+            new DataTreeCohortActor.PreCommit(txId));
         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
-        processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
+        processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
     }
 
     void commit() throws ExecutionException, TimeoutException {
+        LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+        if (successfulFromPrevious.isEmpty()) {
+            changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
+            return;
+        }
+
         Preconditions.checkState(successfulFromPrevious != null);
-        Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
+        final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
+            new DataTreeCohortActor.Commit(txId));
         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
-        processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
+        processResponses(futures, State.COMMIT_SENT, State.COMMITED);
     }
 
-    Optional<Future<Iterable<Object>>> abort() {
-        if (successfulFromPrevious != null) {
-            return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
+    Optional<List<Future<Object>>> abort() {
+        LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
+        state = State.ABORTED;
+        if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) {
+            return Optional.empty();
         }
 
-        return Optional.empty();
+        final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
+        final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
+        for (Success s : successfulFromPrevious) {
+            futures.add(Patterns.ask(s.getCohort(), message, timeout));
+        }
+        return Optional.of(futures);
     }
 
-    private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
-        return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
-                cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
+    private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
+        LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
+
+        final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
+        for (Success s : successfulFromPrevious) {
+            final ActorRef actor = s.getCohort();
+            ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
+        }
+        return ret;
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
+    private void processResponses(final List<Entry<ActorRef, Future<Object>>> futures, final State currentState,
             final State afterState) throws TimeoutException, ExecutionException {
+        LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+
         final Iterable<Object> results;
         try {
-            results = Await.result(resultsFuture, timeout.duration());
+            results = Await.result(Futures.sequence(Lists.transform(futures, e -> e.getValue()),
+                ExecutionContexts.global()), timeout.duration());
+        } catch (TimeoutException e) {
+            successfulFromPrevious = null;
+            LOG.debug("{}: processResponses - error from Future", txId, e);
+
+            for (Entry<ActorRef, Future<Object>> f : futures) {
+                if (!f.getValue().isCompleted()) {
+                    LOG.info("{}: actor {} failed to respond", txId, f.getKey());
+                }
+            }
+            throw e;
+        } catch (ExecutionException e) {
+            successfulFromPrevious = null;
+            LOG.debug("{}: processResponses - error from Future", txId, e);
+            throw e;
         } catch (Exception e) {
             successfulFromPrevious = null;
-            Throwables.propagateIfInstanceOf(e, TimeoutException.class);
-            throw Throwables.propagate(e);
+            LOG.debug("{}: processResponses - error from Future", txId, e);
+            throw new ExecutionException(e);
         }
-        Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
-        Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
+
+        final Collection<Failure> failed = new ArrayList<>(1);
+        final List<Success> successful = new ArrayList<>(futures.size());
+        for (Object result : results) {
+            if (result instanceof DataTreeCohortActor.Success) {
+                successful.add((Success) result);
+            } else if (result instanceof Status.Failure) {
+                failed.add((Failure) result);
+            } else {
+                LOG.warn("{}: unrecognized response {}, ignoring it", result);
+            }
+        }
+
+        LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
+
         successfulFromPrevious = successful;
-        if (!Iterables.isEmpty(failed)) {
+        if (!failed.isEmpty()) {
             changeStateFrom(currentState, State.FAILED);
-            Iterator<Failure> it = failed.iterator();
-            Throwable firstEx = it.next().cause();
+            final Iterator<Failure> it = failed.iterator();
+            final Throwable firstEx = it.next().cause();
             while (it.hasNext()) {
                 firstEx.addSuppressed(it.next().cause());
             }
-            Throwables.propagateIfPossible(firstEx, ExecutionException.class);
-            Throwables.propagateIfPossible(firstEx, TimeoutException.class);
-            throw Throwables.propagate(firstEx);
+            Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class);
+            Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class);
+            throw new ExecutionException(firstEx);
         }
         changeStateFrom(currentState, afterState);
     }