X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FCompositeDataTreeCohort.java;h=4044a4cb3a0ec991ca429a1d5c3dac63db426768;hb=ec870dee9bacb971f11bc747b69e84ac37f5d746;hp=d833962277d9a1c3ee1785c2147737740649b6f5;hpb=4d1709660b7af992d4c382a2a38debb5c7d64fb9;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java index d833962277..4044a4cb3a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java @@ -13,7 +13,6 @@ import akka.actor.Status.Failure; import akka.dispatch.ExecutionContexts; import akka.dispatch.Futures; import akka.dispatch.Recover; -import akka.japi.Function; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; @@ -21,25 +20,28 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import java.util.Collection; import java.util.Iterator; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; /** - * * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort. - * + *

* It tracks current operation and list of cohorts which successfuly finished previous phase in * case, if abort is necessary to invoke it only on cohort steps which are still active. * */ class CompositeDataTreeCohort { + private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class); private enum State { /** @@ -71,21 +73,18 @@ class CompositeDataTreeCohort { */ COMMITED, /** - * Some of cohorts responsed back with unsuccessful message. - * + * Some of cohorts responded back with unsuccessful message. */ FAILED, /** - * * Abort message was send to all cohorts which responded with success previously. - * */ ABORTED } protected static final Recover EXCEPTION_TO_MESSAGE = new Recover() { @Override - public Failure recover(Throwable error) throws Throwable { + public Failure recover(final Throwable error) throws Throwable { return new Failure(error); } }; @@ -98,30 +97,49 @@ class CompositeDataTreeCohort { private Iterable successfulFromPrevious; private State state = State.IDLE; - CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, TransactionIdentifier transactionID, - SchemaContext schema, Timeout timeout) { + CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID, + final SchemaContext schema, final Timeout timeout) { this.registry = Preconditions.checkNotNull(registry); this.txId = Preconditions.checkNotNull(transactionID); this.schema = Preconditions.checkNotNull(schema); this.timeout = Preconditions.checkNotNull(timeout); } - void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException { + void reset() { + switch (state) { + case CAN_COMMIT_SENT: + case CAN_COMMIT_SUCCESSFUL: + case PRE_COMMIT_SENT: + case PRE_COMMIT_SUCCESSFUL: + case COMMIT_SENT: + abort(); + break; + default : + break; + } + + successfulFromPrevious = null; + state = State.IDLE; + } + + void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException { + LOG.debug("{}: canCommit - candidate: {}", txId, tip); + Collection messages = registry.createCanCommitMessages(txId, tip, schema); + + LOG.debug("{}: canCommit - messages: {}", txId, messages); + // FIXME: Optimize empty collection list with pre-created futures, containing success. - Future> canCommitsFuture = - Futures.traverse(messages, new Function>() { - @Override - public Future apply(CanCommit input) { - return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE, - ExecutionContexts.global()); - } - }, ExecutionContexts.global()); + Future> canCommitsFuture = Futures.traverse(messages, + input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE, + ExecutionContexts.global()), ExecutionContexts.global()); changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT); processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL); } void preCommit() throws ExecutionException, TimeoutException { + LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious); + Preconditions.checkState(successfulFromPrevious != null); Future> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId)); changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT); @@ -129,41 +147,51 @@ class CompositeDataTreeCohort { } void commit() throws ExecutionException, TimeoutException { + LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious); + Preconditions.checkState(successfulFromPrevious != null); Future> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId)); changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT); processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED); } - void abort() throws TimeoutException { - if (successfulFromPrevious != null) { - sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)); + Optional>> abort() { + LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious); + + state = State.ABORTED; + if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) { + return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId))); } + + return Optional.empty(); } private Future> sendMesageToSuccessful(final Object message) { - return Futures.traverse(successfulFromPrevious, new Function>() { + LOG.debug("{}: sendMesageToSuccessful: {}", txId, message); - @Override - public Future apply(DataTreeCohortActor.Success cohortResponse) throws Exception { - return Patterns.ask(cohortResponse.getCohort(), message, timeout); - } - - }, ExecutionContexts.global()); + return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask( + cohortResponse.getCohort(), message, timeout), ExecutionContexts.global()); } - private void processResponses(Future> resultsFuture, State currentState, State afterState) - throws TimeoutException, ExecutionException { + @SuppressWarnings("checkstyle:IllegalCatch") + private void processResponses(final Future> resultsFuture, final State currentState, + final State afterState) throws TimeoutException, ExecutionException { + LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState); + final Iterable results; try { results = Await.result(resultsFuture, timeout.duration()); } catch (Exception e) { successfulFromPrevious = null; + LOG.debug("{}: processResponses - error from Future", txId, e); Throwables.propagateIfInstanceOf(e, TimeoutException.class); throw Throwables.propagate(e); } Iterable failed = Iterables.filter(results, Status.Failure.class); Iterable successful = Iterables.filter(results, DataTreeCohortActor.Success.class); + + LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed); + successfulFromPrevious = successful; if (!Iterables.isEmpty(failed)) { changeStateFrom(currentState, State.FAILED); @@ -179,7 +207,7 @@ class CompositeDataTreeCohort { changeStateFrom(currentState, afterState); } - void changeStateFrom(State expected, State followup) { + void changeStateFrom(final State expected, final State followup) { Preconditions.checkState(state == expected); state = followup; }