BUG-8219: Cleanup CompositeDataTreeCohort 59/55859/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 21 Apr 2017 14:44:10 +0000 (16:44 +0200)
committerRobert Varga <nite@hq.sk>
Sun, 23 Apr 2017 12:42:10 +0000 (12:42 +0000)
This patch reworks the logic so we can track which cohort times
out in case that happens. We also instantiate shortcuts so we do
not go through asynchronous processing if there are no cohorts
at all.

Change-Id: I9493b768c86e8d6b2d0f4f1d13f53b13ff98fe7b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 9155298250e0fbfc0534ab5553fc562289be268b)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java

index 4044a4c..a84fa46 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.dispatch.ExecutionContexts;
@@ -17,9 +18,14 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -82,19 +88,19 @@ class CompositeDataTreeCohort {
         ABORTED
     }
 
-    protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
+    static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
         @Override
-        public Failure recover(final Throwable error) throws Throwable {
+        public Failure recover(final Throwable error) {
             return new Failure(error);
         }
     };
 
-
     private final DataTreeCohortActorRegistry registry;
     private final TransactionIdentifier txId;
     private final SchemaContext schema;
     private final Timeout timeout;
-    private Iterable<Success> successfulFromPrevious;
+
+    private List<Success> successfulFromPrevious;
     private State state = State.IDLE;
 
     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
@@ -114,8 +120,13 @@ class CompositeDataTreeCohort {
             case COMMIT_SENT:
                 abort();
                 break;
-            default :
+            case ABORTED:
+            case COMMITED:
+            case FAILED:
+            case IDLE:
                 break;
+            default:
+                throw new IllegalStateException("Unhandled state " + state);
         }
 
         successfulFromPrevious = null;
@@ -123,86 +134,139 @@ class CompositeDataTreeCohort {
     }
 
     void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
-        LOG.debug("{}: canCommit -  candidate: {}", txId, tip);
-
-        Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+        LOG.debug("{}: canCommit - candidate: {}", txId, tip);
 
+        final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
         LOG.debug("{}: canCommit - messages: {}", txId, messages);
+        if (messages.isEmpty()) {
+            successfulFromPrevious = ImmutableList.of();
+            changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
+            return;
+        }
+
+        final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
+        for (CanCommit message : messages) {
+            final ActorRef actor = message.getCohort();
+            final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
+                ExecutionContexts.global());
+            LOG.trace("{}: requesting canCommit from {}", txId, actor);
+            futures.add(new SimpleImmutableEntry<>(actor, future));
+        }
 
-        // FIXME: Optimize empty collection list with pre-created futures, containing success.
-        Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
-            input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
-                    ExecutionContexts.global()), ExecutionContexts.global());
         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
-        processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
+        processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
     }
 
     void preCommit() throws ExecutionException, TimeoutException {
         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
 
         Preconditions.checkState(successfulFromPrevious != null);
-        Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
+        if (successfulFromPrevious.isEmpty()) {
+            changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
+            return;
+        }
+
+        final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
+            new DataTreeCohortActor.PreCommit(txId));
         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
-        processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
+        processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
     }
 
     void commit() throws ExecutionException, TimeoutException {
         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+        if (successfulFromPrevious.isEmpty()) {
+            changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
+            return;
+        }
 
         Preconditions.checkState(successfulFromPrevious != null);
-        Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
+        final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
+            new DataTreeCohortActor.Commit(txId));
         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
-        processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
+        processResponses(futures, State.COMMIT_SENT, State.COMMITED);
     }
 
-    Optional<Future<Iterable<Object>>> abort() {
+    Optional<List<Future<Object>>> abort() {
         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
 
         state = State.ABORTED;
-        if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) {
-            return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
+        if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) {
+            return Optional.empty();
         }
 
-        return Optional.empty();
+        final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
+        final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
+        for (Success s : successfulFromPrevious) {
+            futures.add(Patterns.ask(s.getCohort(), message, timeout));
+        }
+        return Optional.of(futures);
     }
 
-    private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
+    private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
         LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
 
-        return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
-                cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
+        final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
+        for (Success s : successfulFromPrevious) {
+            final ActorRef actor = s.getCohort();
+            ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
+        }
+        return ret;
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
+    private void processResponses(final List<Entry<ActorRef, Future<Object>>> futures, final State currentState,
             final State afterState) throws TimeoutException, ExecutionException {
         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
 
         final Iterable<Object> results;
         try {
-            results = Await.result(resultsFuture, timeout.duration());
+            results = Await.result(Futures.sequence(Lists.transform(futures, e -> e.getValue()),
+                ExecutionContexts.global()), timeout.duration());
+        } catch (TimeoutException e) {
+            successfulFromPrevious = null;
+            LOG.debug("{}: processResponses - error from Future", txId, e);
+
+            for (Entry<ActorRef, Future<Object>> f : futures) {
+                if (!f.getValue().isCompleted()) {
+                    LOG.info("{}: actor {} failed to respond", txId, f.getKey());
+                }
+            }
+            throw e;
+        } catch (ExecutionException e) {
+            successfulFromPrevious = null;
+            LOG.debug("{}: processResponses - error from Future", txId, e);
+            throw e;
         } catch (Exception e) {
             successfulFromPrevious = null;
             LOG.debug("{}: processResponses - error from Future", txId, e);
-            Throwables.propagateIfInstanceOf(e, TimeoutException.class);
-            throw Throwables.propagate(e);
+            throw new ExecutionException(e);
+        }
+
+        final Collection<Failure> failed = new ArrayList<>(1);
+        final List<Success> successful = new ArrayList<>(futures.size());
+        for (Object result : results) {
+            if (result instanceof DataTreeCohortActor.Success) {
+                successful.add((Success) result);
+            } else if (result instanceof Status.Failure) {
+                failed.add((Failure) result);
+            } else {
+                LOG.warn("{}: unrecognized response {}, ignoring it", result);
+            }
         }
-        Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
-        Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
 
         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
 
         successfulFromPrevious = successful;
-        if (!Iterables.isEmpty(failed)) {
+        if (!failed.isEmpty()) {
             changeStateFrom(currentState, State.FAILED);
-            Iterator<Failure> it = failed.iterator();
-            Throwable firstEx = it.next().cause();
+            final Iterator<Failure> it = failed.iterator();
+            final Throwable firstEx = it.next().cause();
             while (it.hasNext()) {
                 firstEx.addSuppressed(it.next().cause());
             }
-            Throwables.propagateIfPossible(firstEx, ExecutionException.class);
-            Throwables.propagateIfPossible(firstEx, TimeoutException.class);
-            throw Throwables.propagate(firstEx);
+            Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class);
+            Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class);
+            throw new ExecutionException(firstEx);
         }
         changeStateFrom(currentState, afterState);
     }
index 05ebe55..0e149fa 100644 (file)
@@ -82,7 +82,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
         cohort.tell(PoisonPill.getInstance(), cohort);
     }
 
-    Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(final TransactionIdentifier txId,
+    List<DataTreeCohortActor.CanCommit> createCanCommitMessages(final TransactionIdentifier txId,
             final DataTreeCandidate candidate, final SchemaContext schema) {
         try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
             return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
@@ -204,12 +204,12 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
             return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path);
         }
 
-        private Collection<DataTreeCohortActor.CanCommit> perform(final RegistrationTreeNode<ActorRef> rootNode) {
+        List<DataTreeCohortActor.CanCommit> perform(final RegistrationTreeNode<ActorRef> rootNode) {
             final List<PathArgument> toLookup = candidate.getRootPath().getPathArguments();
             lookupAndCreateCanCommits(toLookup, 0, rootNode);
 
             final Map<ActorRef, Collection<DOMDataTreeCandidate>> mapView = actorToCandidates.asMap();
-            Collection<DataTreeCohortActor.CanCommit> messages = new ArrayList<>(mapView.size());
+            final List<DataTreeCohortActor.CanCommit> messages = new ArrayList<>(mapView.size());
             for (Map.Entry<ActorRef, Collection<DOMDataTreeCandidate>> entry: mapView.entrySet()) {
                 messages.add(new DataTreeCohortActor.CanCommit(txId, entry.getValue(), schema, entry.getKey()));
             }
index 2a8fdbe..b9e3997 100644 (file)
@@ -8,11 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -126,13 +128,13 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         candidate = null;
         state = State.ABORTED;
 
-        final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
+        final Optional<List<Future<Object>>> maybeAborts = userCohorts.abort();
         if (!maybeAborts.isPresent()) {
             abortCallback.onSuccess(null);
             return;
         }
 
-        final Future<Iterable<Object>> aborts = maybeAborts.get();
+        final Future<Iterable<Object>> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global());
         if (aborts.isCompleted()) {
             abortCallback.onSuccess(null);
             return;
index 5b21871..1e01537 100644 (file)
@@ -230,7 +230,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
 
     @Test
     public void testAbort() throws Exception {
-        doReturn(true).when(mockShardDataTree).startAbort(cohort);
+        doReturn(Boolean.TRUE).when(mockShardDataTree).startAbort(cohort);
 
         abort(cohort).get();
         verify(mockShardDataTree).startAbort(cohort);
@@ -241,7 +241,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
         doReturn(true).when(mockShardDataTree).startAbort(cohort);
 
         final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
-        doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();
+        doReturn(Optional.of(Collections.singletonList(cohortFuture.future()))).when(mockUserCohorts).abort();
 
         final Future<?> abortFuture = abort(cohort);
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.