BUG-8219: Cleanup CompositeDataTreeCohort 59/55859/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 21 Apr 2017 14:44:10 +0000 (16:44 +0200)
committerRobert Varga <nite@hq.sk>
Sun, 23 Apr 2017 12:42:10 +0000 (12:42 +0000)
This patch reworks the logic so we can track which cohort times
out in case that happens. We also instantiate shortcuts so we do
not go through asynchronous processing if there are no cohorts
at all.

Change-Id: I9493b768c86e8d6b2d0f4f1d13f53b13ff98fe7b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 9155298250e0fbfc0534ab5553fc562289be268b)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java

index 4044a4cb3a0ec991ca429a1d5c3dac63db426768..a84fa4610d5763c9ab84ae73687640f1236024f5 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.dispatch.ExecutionContexts;
@@ -17,9 +18,14 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -82,19 +88,19 @@ class CompositeDataTreeCohort {
         ABORTED
     }
 
-    protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
+    static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
         @Override
-        public Failure recover(final Throwable error) throws Throwable {
+        public Failure recover(final Throwable error) {
             return new Failure(error);
         }
     };
 
-
     private final DataTreeCohortActorRegistry registry;
     private final TransactionIdentifier txId;
     private final SchemaContext schema;
     private final Timeout timeout;
-    private Iterable<Success> successfulFromPrevious;
+
+    private List<Success> successfulFromPrevious;
     private State state = State.IDLE;
 
     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
@@ -114,8 +120,13 @@ class CompositeDataTreeCohort {
             case COMMIT_SENT:
                 abort();
                 break;
-            default :
+            case ABORTED:
+            case COMMITED:
+            case FAILED:
+            case IDLE:
                 break;
+            default:
+                throw new IllegalStateException("Unhandled state " + state);
         }
 
         successfulFromPrevious = null;
@@ -123,86 +134,139 @@ class CompositeDataTreeCohort {
     }
 
     void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
-        LOG.debug("{}: canCommit -  candidate: {}", txId, tip);
-
-        Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+        LOG.debug("{}: canCommit - candidate: {}", txId, tip);
 
+        final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
         LOG.debug("{}: canCommit - messages: {}", txId, messages);
+        if (messages.isEmpty()) {
+            successfulFromPrevious = ImmutableList.of();
+            changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
+            return;
+        }
+
+        final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
+        for (CanCommit message : messages) {
+            final ActorRef actor = message.getCohort();
+            final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
+                ExecutionContexts.global());
+            LOG.trace("{}: requesting canCommit from {}", txId, actor);
+            futures.add(new SimpleImmutableEntry<>(actor, future));
+        }
 
-        // FIXME: Optimize empty collection list with pre-created futures, containing success.
-        Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
-            input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
-                    ExecutionContexts.global()), ExecutionContexts.global());
         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
-        processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
+        processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
     }
 
     void preCommit() throws ExecutionException, TimeoutException {
         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
 
         Preconditions.checkState(successfulFromPrevious != null);
-        Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
+        if (successfulFromPrevious.isEmpty()) {
+            changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
+            return;
+        }
+
+        final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
+            new DataTreeCohortActor.PreCommit(txId));
         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
-        processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
+        processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
     }
 
     void commit() throws ExecutionException, TimeoutException {
         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+        if (successfulFromPrevious.isEmpty()) {
+            changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
+            return;
+        }
 
         Preconditions.checkState(successfulFromPrevious != null);
-        Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
+        final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
+            new DataTreeCohortActor.Commit(txId));
         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
-        processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
+        processResponses(futures, State.COMMIT_SENT, State.COMMITED);
     }
 
-    Optional<Future<Iterable<Object>>> abort() {
+    Optional<List<Future<Object>>> abort() {
         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
 
         state = State.ABORTED;
-        if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) {
-            return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
+        if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) {
+            return Optional.empty();
         }
 
-        return Optional.empty();
+        final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
+        final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
+        for (Success s : successfulFromPrevious) {
+            futures.add(Patterns.ask(s.getCohort(), message, timeout));
+        }
+        return Optional.of(futures);
     }
 
-    private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
+    private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
         LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
 
-        return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
-                cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
+        final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
+        for (Success s : successfulFromPrevious) {
+            final ActorRef actor = s.getCohort();
+            ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
+        }
+        return ret;
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
+    private void processResponses(final List<Entry<ActorRef, Future<Object>>> futures, final State currentState,
             final State afterState) throws TimeoutException, ExecutionException {
         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
 
         final Iterable<Object> results;
         try {
-            results = Await.result(resultsFuture, timeout.duration());
+            results = Await.result(Futures.sequence(Lists.transform(futures, e -> e.getValue()),
+                ExecutionContexts.global()), timeout.duration());
+        } catch (TimeoutException e) {
+            successfulFromPrevious = null;
+            LOG.debug("{}: processResponses - error from Future", txId, e);
+
+            for (Entry<ActorRef, Future<Object>> f : futures) {
+                if (!f.getValue().isCompleted()) {
+                    LOG.info("{}: actor {} failed to respond", txId, f.getKey());
+                }
+            }
+            throw e;
+        } catch (ExecutionException e) {
+            successfulFromPrevious = null;
+            LOG.debug("{}: processResponses - error from Future", txId, e);
+            throw e;
         } catch (Exception e) {
             successfulFromPrevious = null;
             LOG.debug("{}: processResponses - error from Future", txId, e);
-            Throwables.propagateIfInstanceOf(e, TimeoutException.class);
-            throw Throwables.propagate(e);
+            throw new ExecutionException(e);
+        }
+
+        final Collection<Failure> failed = new ArrayList<>(1);
+        final List<Success> successful = new ArrayList<>(futures.size());
+        for (Object result : results) {
+            if (result instanceof DataTreeCohortActor.Success) {
+                successful.add((Success) result);
+            } else if (result instanceof Status.Failure) {
+                failed.add((Failure) result);
+            } else {
+                LOG.warn("{}: unrecognized response {}, ignoring it", result);
+            }
         }
-        Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
-        Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
 
         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
 
         successfulFromPrevious = successful;
-        if (!Iterables.isEmpty(failed)) {
+        if (!failed.isEmpty()) {
             changeStateFrom(currentState, State.FAILED);
-            Iterator<Failure> it = failed.iterator();
-            Throwable firstEx = it.next().cause();
+            final Iterator<Failure> it = failed.iterator();
+            final Throwable firstEx = it.next().cause();
             while (it.hasNext()) {
                 firstEx.addSuppressed(it.next().cause());
             }
-            Throwables.propagateIfPossible(firstEx, ExecutionException.class);
-            Throwables.propagateIfPossible(firstEx, TimeoutException.class);
-            throw Throwables.propagate(firstEx);
+            Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class);
+            Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class);
+            throw new ExecutionException(firstEx);
         }
         changeStateFrom(currentState, afterState);
     }
index 05ebe55bc6b6713932100c35b0f8cce6884fae8f..0e149fa91cc9800e177a3483ae272b2c16d48c10 100644 (file)
@@ -82,7 +82,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
         cohort.tell(PoisonPill.getInstance(), cohort);
     }
 
-    Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(final TransactionIdentifier txId,
+    List<DataTreeCohortActor.CanCommit> createCanCommitMessages(final TransactionIdentifier txId,
             final DataTreeCandidate candidate, final SchemaContext schema) {
         try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
             return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
@@ -204,12 +204,12 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
             return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path);
         }
 
-        private Collection<DataTreeCohortActor.CanCommit> perform(final RegistrationTreeNode<ActorRef> rootNode) {
+        List<DataTreeCohortActor.CanCommit> perform(final RegistrationTreeNode<ActorRef> rootNode) {
             final List<PathArgument> toLookup = candidate.getRootPath().getPathArguments();
             lookupAndCreateCanCommits(toLookup, 0, rootNode);
 
             final Map<ActorRef, Collection<DOMDataTreeCandidate>> mapView = actorToCandidates.asMap();
-            Collection<DataTreeCohortActor.CanCommit> messages = new ArrayList<>(mapView.size());
+            final List<DataTreeCohortActor.CanCommit> messages = new ArrayList<>(mapView.size());
             for (Map.Entry<ActorRef, Collection<DOMDataTreeCandidate>> entry: mapView.entrySet()) {
                 messages.add(new DataTreeCohortActor.CanCommit(txId, entry.getValue(), schema, entry.getKey()));
             }
index 2a8fdbe3dbdfbd6d0b675c311de83d5528ae6686..b9e39975e5c869529a31fa2a56dad6e0c0db9378 100644 (file)
@@ -8,11 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -126,13 +128,13 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         candidate = null;
         state = State.ABORTED;
 
-        final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
+        final Optional<List<Future<Object>>> maybeAborts = userCohorts.abort();
         if (!maybeAborts.isPresent()) {
             abortCallback.onSuccess(null);
             return;
         }
 
-        final Future<Iterable<Object>> aborts = maybeAborts.get();
+        final Future<Iterable<Object>> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global());
         if (aborts.isCompleted()) {
             abortCallback.onSuccess(null);
             return;
index 5b218715737b6a34032aacc0beb01826e2da1444..1e0153743115259f817b211591b1441420bf32a8 100644 (file)
@@ -230,7 +230,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
 
     @Test
     public void testAbort() throws Exception {
-        doReturn(true).when(mockShardDataTree).startAbort(cohort);
+        doReturn(Boolean.TRUE).when(mockShardDataTree).startAbort(cohort);
 
         abort(cohort).get();
         verify(mockShardDataTree).startAbort(cohort);
@@ -241,7 +241,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
         doReturn(true).when(mockShardDataTree).startAbort(cohort);
 
         final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
-        doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();
+        doReturn(Optional.of(Collections.singletonList(cohortFuture.future()))).when(mockUserCohorts).abort();
 
         final Future<?> abortFuture = abort(cohort);