package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.Status.Failure;
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
import akka.dispatch.Recover;
-import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.Await;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.compat.java8.FutureConverters;
import scala.concurrent.Future;
/**
- *
* Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
- *
+ * <p/>
* It tracks current operation and list of cohorts which successfuly finished previous phase in
* case, if abort is necessary to invoke it only on cohort steps which are still active.
*
*/
class CompositeDataTreeCohort {
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
private enum State {
/**
*/
COMMITED,
/**
- * Some of cohorts responsed back with unsuccessful message.
- *
+ * Some of cohorts responded back with unsuccessful message.
*/
FAILED,
/**
- *
* Abort message was send to all cohorts which responded with success previously.
- *
*/
ABORTED
}
- protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
+ static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
@Override
- public Failure recover(final Throwable error) throws Throwable {
+ public Failure recover(final Throwable error) {
return new Failure(error);
}
};
-
private final DataTreeCohortActorRegistry registry;
private final TransactionIdentifier txId;
private final SchemaContext schema;
+ private final Executor callbackExecutor;
private final Timeout timeout;
- private Iterable<Success> successfulFromPrevious;
+
+ @Nonnull
+ private List<Success> successfulFromPrevious = Collections.emptyList();
private State state = State.IDLE;
CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
- final SchemaContext schema, final Timeout timeout) {
+ final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
this.registry = Preconditions.checkNotNull(registry);
this.txId = Preconditions.checkNotNull(transactionID);
this.schema = Preconditions.checkNotNull(schema);
+ this.callbackExecutor = Preconditions.checkNotNull(callbackExecutor);
this.timeout = Preconditions.checkNotNull(timeout);
}
- void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
- Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
- // FIXME: Optimize empty collection list with pre-created futures, containing success.
- Future<Iterable<Object>> canCommitsFuture =
- Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
- @Override
- public Future<Object> apply(final CanCommit input) {
- return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
- ExecutionContexts.global());
- }
- }, ExecutionContexts.global());
+ void reset() {
+ switch (state) {
+ case CAN_COMMIT_SENT:
+ case CAN_COMMIT_SUCCESSFUL:
+ case PRE_COMMIT_SENT:
+ case PRE_COMMIT_SUCCESSFUL:
+ case COMMIT_SENT:
+ abort();
+ break;
+ case ABORTED:
+ case COMMITED:
+ case FAILED:
+ case IDLE:
+ break;
+ default:
+ throw new IllegalStateException("Unhandled state " + state);
+ }
+
+ successfulFromPrevious = Collections.emptyList();
+ state = State.IDLE;
+ }
+
+ Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{}: canCommit - candidate: {}", txId, tip);
+ } else {
+ LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
+ }
+
+ final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+ LOG.debug("{}: canCommit - messages: {}", txId, messages);
+ if (messages.isEmpty()) {
+ successfulFromPrevious = Collections.emptyList();
+ changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
+ return Optional.empty();
+ }
+
+ final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
+ for (CanCommit message : messages) {
+ final ActorRef actor = message.getCohort();
+ final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
+ ExecutionContexts.global());
+ LOG.trace("{}: requesting canCommit from {}", txId, actor);
+ futures.add(new SimpleImmutableEntry<>(actor, future));
+ }
+
changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
- processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
+ return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
}
- void preCommit() throws ExecutionException, TimeoutException {
- Preconditions.checkState(successfulFromPrevious != null);
- Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
+ Optional<CompletionStage<Void>> preCommit() {
+ LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
+ if (successfulFromPrevious.isEmpty()) {
+ changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
+ return Optional.empty();
+ }
+
+ final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
+ new DataTreeCohortActor.PreCommit(txId));
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
- processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
+ return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
}
- void commit() throws ExecutionException, TimeoutException {
- Preconditions.checkState(successfulFromPrevious != null);
- Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
+ Optional<CompletionStage<Void>> commit() {
+ LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+ if (successfulFromPrevious.isEmpty()) {
+ changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
+ return Optional.empty();
+ }
+
+ final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
+ new DataTreeCohortActor.Commit(txId));
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
- processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
+ return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
}
- Optional<Future<Iterable<Object>>> abort() {
- if (successfulFromPrevious != null) {
- return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
+ Optional<CompletionStage<?>> abort() {
+ LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
+ state = State.ABORTED;
+ if (successfulFromPrevious.isEmpty()) {
+ return Optional.empty();
+ }
+
+ final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
+ final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
+ for (Success s : successfulFromPrevious) {
+ futures.add(Patterns.ask(s.getCohort(), message, timeout));
}
- return Optional.empty();
+ return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
+ }
+
+ private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
+ LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
+
+ final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
+ for (Success s : successfulFromPrevious) {
+ final ActorRef actor = s.getCohort();
+ ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
+ }
+ return ret;
}
- private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
- return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
+ @Nonnull
+ private CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
+ final State currentState, final State afterState) {
+ LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+ final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
+ Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
+ ExecutionContexts.global());
+ aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
- public Future<Object> apply(final DataTreeCohortActor.Success cohortResponse) throws Exception {
- return Patterns.ask(cohortResponse.getCohort(), message, timeout);
+ public void onComplete(Throwable failure, Iterable<Object> results) {
+ callbackExecutor.execute(
+ () -> processResponses(failure, results, currentState, afterState, returnFuture));
}
-
}, ExecutionContexts.global());
+
+ return returnFuture;
}
- private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState, final State afterState)
- throws TimeoutException, ExecutionException {
- final Iterable<Object> results;
- try {
- results = Await.result(resultsFuture, timeout.duration());
- } catch (Exception e) {
- successfulFromPrevious = null;
- Throwables.propagateIfInstanceOf(e, TimeoutException.class);
- throw Throwables.propagate(e);
+ // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
+ // generic type is Void.
+ @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
+ private void processResponses(Throwable failure, Iterable<Object> results, State currentState, State afterState,
+ CompletableFuture<Void> resultFuture) {
+ if (failure != null) {
+ successfulFromPrevious = Collections.emptyList();
+ resultFuture.completeExceptionally(failure);
+ return;
+ }
+
+ final Collection<Failure> failed = new ArrayList<>(1);
+ final List<Success> successful = new ArrayList<>();
+ for (Object result : results) {
+ if (result instanceof DataTreeCohortActor.Success) {
+ successful.add((Success) result);
+ } else if (result instanceof Status.Failure) {
+ failed.add((Failure) result);
+ } else {
+ LOG.warn("{}: unrecognized response {}, ignoring it", result);
+ }
}
- Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
- Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
- successfulFromPrevious = successful;
- if (!Iterables.isEmpty(failed)) {
+
+ LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
+
+ if (!failed.isEmpty()) {
changeStateFrom(currentState, State.FAILED);
- Iterator<Failure> it = failed.iterator();
- Throwable firstEx = it.next().cause();
+ final Iterator<Failure> it = failed.iterator();
+ final Throwable firstEx = it.next().cause();
while (it.hasNext()) {
firstEx.addSuppressed(it.next().cause());
}
- Throwables.propagateIfPossible(firstEx, ExecutionException.class);
- Throwables.propagateIfPossible(firstEx, TimeoutException.class);
- throw Throwables.propagate(firstEx);
+
+ successfulFromPrevious = Collections.emptyList();
+ resultFuture.completeExceptionally(firstEx);
+ } else {
+ successfulFromPrevious = successful;
+ changeStateFrom(currentState, afterState);
+ resultFuture.complete(null);
}
- changeStateFrom(currentState, afterState);
}
void changeStateFrom(final State expected, final State followup) {