From: Robert Varga Date: Fri, 21 Apr 2017 14:44:10 +0000 (+0200) Subject: BUG-8219: Cleanup CompositeDataTreeCohort X-Git-Tag: release/nitrogen~318 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=0b3bfca7f39ee1bb6dcf5379f44d0b402adeb7fe BUG-8219: Cleanup CompositeDataTreeCohort This patch reworks the logic so we can track which cohort times out in case that happens. We also instantiate shortcuts so we do not go through asynchronous processing if there are no cohorts at all. Change-Id: I9493b768c86e8d6b2d0f4f1d13f53b13ff98fe7b Signed-off-by: Robert Varga (cherry picked from commit 9155298250e0fbfc0534ab5553fc562289be268b) --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java index 4044a4cb3a..a84fa4610d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; import akka.actor.Status; import akka.actor.Status.Failure; import akka.dispatch.ExecutionContexts; @@ -17,9 +18,14 @@ import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -82,19 +88,19 @@ class CompositeDataTreeCohort { ABORTED } - protected static final Recover EXCEPTION_TO_MESSAGE = new Recover() { + static final Recover EXCEPTION_TO_MESSAGE = new Recover() { @Override - public Failure recover(final Throwable error) throws Throwable { + public Failure recover(final Throwable error) { return new Failure(error); } }; - private final DataTreeCohortActorRegistry registry; private final TransactionIdentifier txId; private final SchemaContext schema; private final Timeout timeout; - private Iterable successfulFromPrevious; + + private List successfulFromPrevious; private State state = State.IDLE; CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID, @@ -114,8 +120,13 @@ class CompositeDataTreeCohort { case COMMIT_SENT: abort(); break; - default : + case ABORTED: + case COMMITED: + case FAILED: + case IDLE: break; + default: + throw new IllegalStateException("Unhandled state " + state); } successfulFromPrevious = null; @@ -123,86 +134,139 @@ class CompositeDataTreeCohort { } void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException { - LOG.debug("{}: canCommit - candidate: {}", txId, tip); - - Collection messages = registry.createCanCommitMessages(txId, tip, schema); + LOG.debug("{}: canCommit - candidate: {}", txId, tip); + final List messages = registry.createCanCommitMessages(txId, tip, schema); LOG.debug("{}: canCommit - messages: {}", txId, messages); + if (messages.isEmpty()) { + successfulFromPrevious = ImmutableList.of(); + changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL); + return; + } + + final List>> futures = new ArrayList<>(messages.size()); + for (CanCommit message : messages) { + final ActorRef actor = message.getCohort(); + final Future future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE, + ExecutionContexts.global()); + LOG.trace("{}: requesting canCommit from {}", txId, actor); + futures.add(new SimpleImmutableEntry<>(actor, future)); + } - // FIXME: Optimize empty collection list with pre-created futures, containing success. - Future> canCommitsFuture = Futures.traverse(messages, - input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE, - ExecutionContexts.global()), ExecutionContexts.global()); changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT); - processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL); + processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL); } void preCommit() throws ExecutionException, TimeoutException { LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious); Preconditions.checkState(successfulFromPrevious != null); - Future> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId)); + if (successfulFromPrevious.isEmpty()) { + changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL); + return; + } + + final List>> futures = sendMessageToSuccessful( + new DataTreeCohortActor.PreCommit(txId)); changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT); - processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL); + processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL); } void commit() throws ExecutionException, TimeoutException { LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious); + if (successfulFromPrevious.isEmpty()) { + changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED); + return; + } Preconditions.checkState(successfulFromPrevious != null); - Future> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId)); + final List>> futures = sendMessageToSuccessful( + new DataTreeCohortActor.Commit(txId)); changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT); - processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED); + processResponses(futures, State.COMMIT_SENT, State.COMMITED); } - Optional>> abort() { + Optional>> abort() { LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious); state = State.ABORTED; - if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) { - return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId))); + if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) { + return Optional.empty(); } - return Optional.empty(); + final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId); + final List> futures = new ArrayList<>(successfulFromPrevious.size()); + for (Success s : successfulFromPrevious) { + futures.add(Patterns.ask(s.getCohort(), message, timeout)); + } + return Optional.of(futures); } - private Future> sendMesageToSuccessful(final Object message) { + private List>> sendMessageToSuccessful(final Object message) { LOG.debug("{}: sendMesageToSuccessful: {}", txId, message); - return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask( - cohortResponse.getCohort(), message, timeout), ExecutionContexts.global()); + final List>> ret = new ArrayList<>(successfulFromPrevious.size()); + for (Success s : successfulFromPrevious) { + final ActorRef actor = s.getCohort(); + ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout))); + } + return ret; } @SuppressWarnings("checkstyle:IllegalCatch") - private void processResponses(final Future> resultsFuture, final State currentState, + private void processResponses(final List>> futures, final State currentState, final State afterState) throws TimeoutException, ExecutionException { LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState); final Iterable results; try { - results = Await.result(resultsFuture, timeout.duration()); + results = Await.result(Futures.sequence(Lists.transform(futures, e -> e.getValue()), + ExecutionContexts.global()), timeout.duration()); + } catch (TimeoutException e) { + successfulFromPrevious = null; + LOG.debug("{}: processResponses - error from Future", txId, e); + + for (Entry> f : futures) { + if (!f.getValue().isCompleted()) { + LOG.info("{}: actor {} failed to respond", txId, f.getKey()); + } + } + throw e; + } catch (ExecutionException e) { + successfulFromPrevious = null; + LOG.debug("{}: processResponses - error from Future", txId, e); + throw e; } catch (Exception e) { successfulFromPrevious = null; LOG.debug("{}: processResponses - error from Future", txId, e); - Throwables.propagateIfInstanceOf(e, TimeoutException.class); - throw Throwables.propagate(e); + throw new ExecutionException(e); + } + + final Collection failed = new ArrayList<>(1); + final List successful = new ArrayList<>(futures.size()); + for (Object result : results) { + if (result instanceof DataTreeCohortActor.Success) { + successful.add((Success) result); + } else if (result instanceof Status.Failure) { + failed.add((Failure) result); + } else { + LOG.warn("{}: unrecognized response {}, ignoring it", result); + } } - Iterable failed = Iterables.filter(results, Status.Failure.class); - Iterable successful = Iterables.filter(results, DataTreeCohortActor.Success.class); LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed); successfulFromPrevious = successful; - if (!Iterables.isEmpty(failed)) { + if (!failed.isEmpty()) { changeStateFrom(currentState, State.FAILED); - Iterator it = failed.iterator(); - Throwable firstEx = it.next().cause(); + final Iterator it = failed.iterator(); + final Throwable firstEx = it.next().cause(); while (it.hasNext()) { firstEx.addSuppressed(it.next().cause()); } - Throwables.propagateIfPossible(firstEx, ExecutionException.class); - Throwables.propagateIfPossible(firstEx, TimeoutException.class); - throw Throwables.propagate(firstEx); + Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class); + Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class); + throw new ExecutionException(firstEx); } changeStateFrom(currentState, afterState); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java index 05ebe55bc6..0e149fa91c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java @@ -82,7 +82,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { cohort.tell(PoisonPill.getInstance(), cohort); } - Collection createCanCommitMessages(final TransactionIdentifier txId, + List createCanCommitMessages(final TransactionIdentifier txId, final DataTreeCandidate candidate, final SchemaContext schema) { try (RegistrationTreeSnapshot cohorts = takeSnapshot()) { return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode()); @@ -204,12 +204,12 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path); } - private Collection perform(final RegistrationTreeNode rootNode) { + List perform(final RegistrationTreeNode rootNode) { final List toLookup = candidate.getRootPath().getPathArguments(); lookupAndCreateCanCommits(toLookup, 0, rootNode); final Map> mapView = actorToCandidates.asMap(); - Collection messages = new ArrayList<>(mapView.size()); + final List messages = new ArrayList<>(mapView.size()); for (Map.Entry> entry: mapView.entrySet()) { messages.add(new DataTreeCohortActor.CanCommit(txId, entry.getValue(), schema, entry.getKey())); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java index 2a8fdbe3db..b9e39975e5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -8,11 +8,13 @@ package org.opendaylight.controller.cluster.datastore; import akka.dispatch.ExecutionContexts; +import akka.dispatch.Futures; import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; +import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -126,13 +128,13 @@ abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort { candidate = null; state = State.ABORTED; - final Optional>> maybeAborts = userCohorts.abort(); + final Optional>> maybeAborts = userCohorts.abort(); if (!maybeAborts.isPresent()) { abortCallback.onSuccess(null); return; } - final Future> aborts = maybeAborts.get(); + final Future> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global()); if (aborts.isCompleted()) { abortCallback.onSuccess(null); return; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java index 5b21871573..1e01537431 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java @@ -230,7 +230,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest { @Test public void testAbort() throws Exception { - doReturn(true).when(mockShardDataTree).startAbort(cohort); + doReturn(Boolean.TRUE).when(mockShardDataTree).startAbort(cohort); abort(cohort).get(); verify(mockShardDataTree).startAbort(cohort); @@ -241,7 +241,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest { doReturn(true).when(mockShardDataTree).startAbort(cohort); final Promise> cohortFuture = akka.dispatch.Futures.promise(); - doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort(); + doReturn(Optional.of(Collections.singletonList(cohortFuture.future()))).when(mockUserCohorts).abort(); final Future abortFuture = abort(cohort);