X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FCompositeDataTreeCohort.java;h=006555f1f8b97d9007cbe0ba19865e2bcb8f3904;hp=8115473a0565fb2c3865f4bc03e7216a8029a579;hb=2634ed7138a343f051ff6452ccc7edd3abfc0c3a;hpb=a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java index 8115473a05..006555f1f8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java @@ -13,7 +13,6 @@ import akka.actor.Status.Failure; import akka.dispatch.ExecutionContexts; import akka.dispatch.Futures; import akka.dispatch.Recover; -import akka.japi.Function; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; @@ -33,9 +32,8 @@ import scala.concurrent.Await; import scala.concurrent.Future; /** - * * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort. - * + *

* It tracks current operation and list of cohorts which successfuly finished previous phase in * case, if abort is necessary to invoke it only on cohort steps which are still active. * @@ -72,14 +70,11 @@ class CompositeDataTreeCohort { */ COMMITED, /** - * Some of cohorts responsed back with unsuccessful message. - * + * Some of cohorts responded back with unsuccessful message. */ FAILED, /** - * * Abort message was send to all cohorts which responded with success previously. - * */ ABORTED } @@ -107,17 +102,29 @@ class CompositeDataTreeCohort { this.timeout = Preconditions.checkNotNull(timeout); } + void reset() { + switch (state) { + case CAN_COMMIT_SENT: + case CAN_COMMIT_SUCCESSFUL: + case PRE_COMMIT_SENT: + case PRE_COMMIT_SUCCESSFUL: + case COMMIT_SENT: + abort(); + break; + default : + break; + } + + successfulFromPrevious = null; + state = State.IDLE; + } + void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException { Collection messages = registry.createCanCommitMessages(txId, tip, schema); // FIXME: Optimize empty collection list with pre-created futures, containing success. - Future> canCommitsFuture = - Futures.traverse(messages, new Function>() { - @Override - public Future apply(final CanCommit input) { - return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE, - ExecutionContexts.global()); - } - }, ExecutionContexts.global()); + Future> canCommitsFuture = Futures.traverse(messages, + input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE, + ExecutionContexts.global()), ExecutionContexts.global()); changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT); processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL); } @@ -137,7 +144,8 @@ class CompositeDataTreeCohort { } Optional>> abort() { - if (successfulFromPrevious != null) { + state = State.ABORTED; + if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) { return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId))); } @@ -145,18 +153,13 @@ class CompositeDataTreeCohort { } private Future> sendMesageToSuccessful(final Object message) { - return Futures.traverse(successfulFromPrevious, new Function>() { - - @Override - public Future apply(final DataTreeCohortActor.Success cohortResponse) throws Exception { - return Patterns.ask(cohortResponse.getCohort(), message, timeout); - } - - }, ExecutionContexts.global()); + return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask( + cohortResponse.getCohort(), message, timeout), ExecutionContexts.global()); } - private void processResponses(final Future> resultsFuture, final State currentState, final State afterState) - throws TimeoutException, ExecutionException { + @SuppressWarnings("checkstyle:IllegalCatch") + private void processResponses(final Future> resultsFuture, final State currentState, + final State afterState) throws TimeoutException, ExecutionException { final Iterable results; try { results = Await.result(resultsFuture, timeout.duration());