X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FCompositeDataTreeCohort.java;h=0ef49b6244c33a8a86fb7adec36106117c1f6175;hp=e8c0567e0610968ceba4eaba50e6045e625ef6f5;hb=a6af137c30470b86d4bc624d4c48cb686495a182;hpb=5e7cf2452ef634dc934a3ea5a2dd95059fbab68c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java index e8c0567e06..0ef49b6244 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java @@ -13,22 +13,25 @@ import akka.actor.Status; import akka.actor.Status.Failure; import akka.dispatch.ExecutionContexts; import akka.dispatch.Futures; +import akka.dispatch.OnComplete; import akka.dispatch.Recover; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success; @@ -36,7 +39,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Await; +import scala.compat.java8.FutureConverters; import scala.concurrent.Future; /** @@ -98,16 +101,19 @@ class CompositeDataTreeCohort { private final DataTreeCohortActorRegistry registry; private final TransactionIdentifier txId; private final SchemaContext schema; + private final Executor callbackExecutor; private final Timeout timeout; - private List successfulFromPrevious; + @Nonnull + private List successfulFromPrevious = Collections.emptyList(); private State state = State.IDLE; CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID, - final SchemaContext schema, final Timeout timeout) { + final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) { this.registry = Preconditions.checkNotNull(registry); this.txId = Preconditions.checkNotNull(transactionID); this.schema = Preconditions.checkNotNull(schema); + this.callbackExecutor = Preconditions.checkNotNull(callbackExecutor); this.timeout = Preconditions.checkNotNull(timeout); } @@ -129,11 +135,11 @@ class CompositeDataTreeCohort { throw new IllegalStateException("Unhandled state " + state); } - successfulFromPrevious = null; + successfulFromPrevious = Collections.emptyList(); state = State.IDLE; } - void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException { + Optional> canCommit(final DataTreeCandidate tip) { if (LOG.isTraceEnabled()) { LOG.trace("{}: canCommit - candidate: {}", txId, tip); } else { @@ -143,9 +149,9 @@ class CompositeDataTreeCohort { final List messages = registry.createCanCommitMessages(txId, tip, schema); LOG.debug("{}: canCommit - messages: {}", txId, messages); if (messages.isEmpty()) { - successfulFromPrevious = ImmutableList.of(); + successfulFromPrevious = Collections.emptyList(); changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL); - return; + return Optional.empty(); } final List>> futures = new ArrayList<>(messages.size()); @@ -158,43 +164,41 @@ class CompositeDataTreeCohort { } changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT); - processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL); + return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL)); } - void preCommit() throws ExecutionException, TimeoutException { + Optional> preCommit() { LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious); - Preconditions.checkState(successfulFromPrevious != null); if (successfulFromPrevious.isEmpty()) { changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL); - return; + return Optional.empty(); } final List>> futures = sendMessageToSuccessful( new DataTreeCohortActor.PreCommit(txId)); changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT); - processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL); + return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL)); } - void commit() throws ExecutionException, TimeoutException { + Optional> commit() { LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious); if (successfulFromPrevious.isEmpty()) { changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED); - return; + return Optional.empty(); } - Preconditions.checkState(successfulFromPrevious != null); final List>> futures = sendMessageToSuccessful( new DataTreeCohortActor.Commit(txId)); changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT); - processResponses(futures, State.COMMIT_SENT, State.COMMITED); + return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED)); } - Optional>> abort() { + Optional> abort() { LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious); state = State.ABORTED; - if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) { + if (successfulFromPrevious.isEmpty()) { return Optional.empty(); } @@ -203,7 +207,8 @@ class CompositeDataTreeCohort { for (Success s : successfulFromPrevious) { futures.add(Patterns.ask(s.getCohort(), message, timeout)); } - return Optional.of(futures); + + return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global()))); } private List>> sendMessageToSuccessful(final Object message) { @@ -217,37 +222,38 @@ class CompositeDataTreeCohort { return ret; } - @SuppressWarnings("checkstyle:IllegalCatch") - private void processResponses(final List>> futures, final State currentState, - final State afterState) throws TimeoutException, ExecutionException { + @Nonnull + private CompletionStage processResponses(final List>> futures, + final State currentState, final State afterState) { LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState); + final CompletableFuture returnFuture = new CompletableFuture<>(); + Future> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue), + ExecutionContexts.global()); - final Iterable results; - try { - results = Await.result(Futures.sequence(Lists.transform(futures, Entry::getValue), - ExecutionContexts.global()), timeout.duration()); - } catch (TimeoutException e) { - successfulFromPrevious = null; - LOG.debug("{}: processResponses - error from Future", txId, e); - - for (Entry> f : futures) { - if (!f.getValue().isCompleted()) { - LOG.info("{}: actor {} failed to respond", txId, f.getKey()); - } + aggregateFuture.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable results) { + callbackExecutor.execute( + () -> processResponses(failure, results, currentState, afterState, returnFuture)); } - throw e; - } catch (ExecutionException e) { - successfulFromPrevious = null; - LOG.debug("{}: processResponses - error from Future", txId, e); - throw e; - } catch (Exception e) { - successfulFromPrevious = null; - LOG.debug("{}: processResponses - error from Future", txId, e); - throw new ExecutionException(e); + }, ExecutionContexts.global()); + + return returnFuture; + } + + // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the + // generic type is Void. + @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION") + private void processResponses(Throwable failure, Iterable results, State currentState, State afterState, + CompletableFuture resultFuture) { + if (failure != null) { + successfulFromPrevious = Collections.emptyList(); + resultFuture.completeExceptionally(failure); + return; } final Collection failed = new ArrayList<>(1); - final List successful = new ArrayList<>(futures.size()); + final List successful = new ArrayList<>(); for (Object result : results) { if (result instanceof DataTreeCohortActor.Success) { successful.add((Success) result); @@ -260,7 +266,6 @@ class CompositeDataTreeCohort { LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed); - successfulFromPrevious = successful; if (!failed.isEmpty()) { changeStateFrom(currentState, State.FAILED); final Iterator it = failed.iterator(); @@ -268,11 +273,14 @@ class CompositeDataTreeCohort { while (it.hasNext()) { firstEx.addSuppressed(it.next().cause()); } - Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class); - Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class); - throw new ExecutionException(firstEx); + + successfulFromPrevious = Collections.emptyList(); + resultFuture.completeExceptionally(firstEx); + } else { + successfulFromPrevious = successful; + changeStateFrom(currentState, afterState); + resultFuture.complete(null); } - changeStateFrom(currentState, afterState); } void changeStateFrom(final State expected, final State followup) {