X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FCompositeDataTreeCohort.java;h=bca00ebc4a6bdbb88cd928ffe609bdb6ae1acd5f;hp=05b9981113a039a75b3679698d9ddd61e8d97318;hb=b65e66f7b1bafb0d0c5fbe1c569835eb890f672a;hpb=925cb4a228d0fda99c7bfeb432eb25285a223887 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java index 05b9981113..bca00ebc4a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java @@ -5,30 +5,42 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorRef; import akka.actor.Status; import akka.actor.Status.Failure; import akka.dispatch.ExecutionContexts; import akka.dispatch.Futures; +import akka.dispatch.OnComplete; import akka.dispatch.Recover; import akka.pattern.Patterns; import akka.util.Timeout; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import scala.concurrent.Await; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.compat.java8.FutureConverters; import scala.concurrent.Future; /** @@ -39,6 +51,7 @@ import scala.concurrent.Future; * */ class CompositeDataTreeCohort { + private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class); private enum State { /** @@ -70,7 +83,7 @@ class CompositeDataTreeCohort { */ COMMITED, /** - * Some of cohorts responsed back with unsuccessful message. + * Some of cohorts responded back with unsuccessful message. */ FAILED, /** @@ -79,96 +92,199 @@ class CompositeDataTreeCohort { ABORTED } - protected static final Recover EXCEPTION_TO_MESSAGE = new Recover() { + static final Recover EXCEPTION_TO_MESSAGE = new Recover() { @Override - public Failure recover(final Throwable error) throws Throwable { + public Failure recover(final Throwable error) { return new Failure(error); } }; - private final DataTreeCohortActorRegistry registry; private final TransactionIdentifier txId; private final SchemaContext schema; + private final Executor callbackExecutor; private final Timeout timeout; - private Iterable successfulFromPrevious; + + private @NonNull List successfulFromPrevious = Collections.emptyList(); private State state = State.IDLE; CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID, - final SchemaContext schema, final Timeout timeout) { - this.registry = Preconditions.checkNotNull(registry); - this.txId = Preconditions.checkNotNull(transactionID); - this.schema = Preconditions.checkNotNull(schema); - this.timeout = Preconditions.checkNotNull(timeout); + final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) { + this.registry = requireNonNull(registry); + this.txId = requireNonNull(transactionID); + this.schema = requireNonNull(schema); + this.callbackExecutor = requireNonNull(callbackExecutor); + this.timeout = requireNonNull(timeout); } - void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException { - Collection messages = registry.createCanCommitMessages(txId, tip, schema); - // FIXME: Optimize empty collection list with pre-created futures, containing success. - Future> canCommitsFuture = Futures.traverse(messages, - input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE, - ExecutionContexts.global()), ExecutionContexts.global()); + void reset() { + switch (state) { + case CAN_COMMIT_SENT: + case CAN_COMMIT_SUCCESSFUL: + case PRE_COMMIT_SENT: + case PRE_COMMIT_SUCCESSFUL: + case COMMIT_SENT: + abort(); + break; + case ABORTED: + case COMMITED: + case FAILED: + case IDLE: + break; + default: + throw new IllegalStateException("Unhandled state " + state); + } + + successfulFromPrevious = Collections.emptyList(); + state = State.IDLE; + } + + Optional> canCommit(final DataTreeCandidate tip) { + if (LOG.isTraceEnabled()) { + LOG.trace("{}: canCommit - candidate: {}", txId, tip); + } else { + LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath()); + } + + final List messages = registry.createCanCommitMessages(txId, tip, schema); + LOG.debug("{}: canCommit - messages: {}", txId, messages); + if (messages.isEmpty()) { + successfulFromPrevious = Collections.emptyList(); + changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL); + return Optional.empty(); + } + + final List>> futures = new ArrayList<>(messages.size()); + for (CanCommit message : messages) { + final ActorRef actor = message.getCohort(); + final Future future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE, + ExecutionContexts.global()); + LOG.trace("{}: requesting canCommit from {}", txId, actor); + futures.add(new SimpleImmutableEntry<>(actor, future)); + } + changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT); - processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL); + return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL)); } - void preCommit() throws ExecutionException, TimeoutException { - Preconditions.checkState(successfulFromPrevious != null); - Future> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId)); + Optional> preCommit() { + LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious); + + if (successfulFromPrevious.isEmpty()) { + changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL); + return Optional.empty(); + } + + final List>> futures = sendMessageToSuccessful( + new DataTreeCohortActor.PreCommit(txId)); changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT); - processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL); + return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL)); } - void commit() throws ExecutionException, TimeoutException { - Preconditions.checkState(successfulFromPrevious != null); - Future> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId)); + Optional> commit() { + LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious); + if (successfulFromPrevious.isEmpty()) { + changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED); + return Optional.empty(); + } + + final List>> futures = sendMessageToSuccessful( + new DataTreeCohortActor.Commit(txId)); changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT); - processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED); + return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED)); } - Optional>> abort() { - if (successfulFromPrevious != null) { - return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId))); + Optional> abort() { + LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious); + + state = State.ABORTED; + if (successfulFromPrevious.isEmpty()) { + return Optional.empty(); } - return Optional.empty(); + final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId); + final List> futures = new ArrayList<>(successfulFromPrevious.size()); + for (Success s : successfulFromPrevious) { + futures.add(Patterns.ask(s.getCohort(), message, timeout)); + } + + return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global()))); } - private Future> sendMesageToSuccessful(final Object message) { - return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask( - cohortResponse.getCohort(), message, timeout), ExecutionContexts.global()); + private List>> sendMessageToSuccessful(final Object message) { + LOG.debug("{}: sendMesageToSuccessful: {}", txId, message); + + final List>> ret = new ArrayList<>(successfulFromPrevious.size()); + for (Success s : successfulFromPrevious) { + final ActorRef actor = s.getCohort(); + ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout))); + } + return ret; } - @SuppressWarnings("checkstyle:IllegalCatch") - private void processResponses(final Future> resultsFuture, final State currentState, - final State afterState) throws TimeoutException, ExecutionException { - final Iterable results; - try { - results = Await.result(resultsFuture, timeout.duration()); - } catch (Exception e) { - successfulFromPrevious = null; - Throwables.propagateIfInstanceOf(e, TimeoutException.class); - throw Throwables.propagate(e); + private @NonNull CompletionStage processResponses(final List>> futures, + final State currentState, final State afterState) { + LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState); + final CompletableFuture returnFuture = new CompletableFuture<>(); + Future> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue), + ExecutionContexts.global()); + + aggregateFuture.onComplete(new OnComplete>() { + @Override + public void onComplete(final Throwable failure, final Iterable results) { + callbackExecutor.execute( + () -> processResponses(failure, results, currentState, afterState, returnFuture)); + } + }, ExecutionContexts.global()); + + return returnFuture; + } + + // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the + // generic type is Void. + @SuppressFBWarnings(value = { "NP_NONNULL_PARAM_VIOLATION", "UPM_UNCALLED_PRIVATE_METHOD" }, + justification = "https://github.com/spotbugs/spotbugs/issues/811") + private void processResponses(final Throwable failure, final Iterable results, + final State currentState, final State afterState, final CompletableFuture resultFuture) { + if (failure != null) { + successfulFromPrevious = Collections.emptyList(); + resultFuture.completeExceptionally(failure); + return; + } + + final Collection failed = new ArrayList<>(1); + final List successful = new ArrayList<>(); + for (Object result : results) { + if (result instanceof DataTreeCohortActor.Success) { + successful.add((Success) result); + } else if (result instanceof Status.Failure) { + failed.add((Failure) result); + } else { + LOG.warn("{}: unrecognized response {}, ignoring it", txId, result); + } } - Iterable failed = Iterables.filter(results, Status.Failure.class); - Iterable successful = Iterables.filter(results, DataTreeCohortActor.Success.class); - successfulFromPrevious = successful; - if (!Iterables.isEmpty(failed)) { + + LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed); + + if (!failed.isEmpty()) { changeStateFrom(currentState, State.FAILED); - Iterator it = failed.iterator(); - Throwable firstEx = it.next().cause(); + final Iterator it = failed.iterator(); + final Throwable firstEx = it.next().cause(); while (it.hasNext()) { firstEx.addSuppressed(it.next().cause()); } - Throwables.propagateIfPossible(firstEx, ExecutionException.class); - Throwables.propagateIfPossible(firstEx, TimeoutException.class); - throw Throwables.propagate(firstEx); + + successfulFromPrevious = Collections.emptyList(); + resultFuture.completeExceptionally(firstEx); + } else { + successfulFromPrevious = successful; + changeStateFrom(currentState, afterState); + resultFuture.complete(null); } - changeStateFrom(currentState, afterState); } void changeStateFrom(final State expected, final State followup) { - Preconditions.checkState(state == expected); + checkState(state == expected); state = followup; } }