* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.datastore;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.Status.Failure;
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
import akka.dispatch.Recover;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
+import scala.compat.java8.FutureConverters;
import scala.concurrent.Future;
/**
* <p/>
* It tracks current operation and list of cohorts which successfuly finished previous phase in
* case, if abort is necessary to invoke it only on cohort steps which are still active.
- *
*/
class CompositeDataTreeCohort {
private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
ABORTED
}
- static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
+ static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<>() {
@Override
public Failure recover(final Throwable error) {
return new Failure(error);
private final DataTreeCohortActorRegistry registry;
private final TransactionIdentifier txId;
- private final SchemaContext schema;
+ private final EffectiveModelContext schema;
+ private final Executor callbackExecutor;
private final Timeout timeout;
- private List<Success> successfulFromPrevious;
+ private @NonNull List<Success> successfulFromPrevious = List.of();
private State state = State.IDLE;
CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
- final SchemaContext schema, final Timeout timeout) {
- this.registry = Preconditions.checkNotNull(registry);
- this.txId = Preconditions.checkNotNull(transactionID);
- this.schema = Preconditions.checkNotNull(schema);
- this.timeout = Preconditions.checkNotNull(timeout);
+ final EffectiveModelContext schema, final Executor callbackExecutor, final Timeout timeout) {
+ this.registry = requireNonNull(registry);
+ txId = requireNonNull(transactionID);
+ this.schema = requireNonNull(schema);
+ this.callbackExecutor = requireNonNull(callbackExecutor);
+ this.timeout = requireNonNull(timeout);
}
void reset() {
throw new IllegalStateException("Unhandled state " + state);
}
- successfulFromPrevious = null;
+ successfulFromPrevious = List.of();
state = State.IDLE;
}
- void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
+ Optional<CompletionStage<Empty>> canCommit(final DataTreeCandidate tip) {
if (LOG.isTraceEnabled()) {
LOG.trace("{}: canCommit - candidate: {}", txId, tip);
} else {
final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
LOG.debug("{}: canCommit - messages: {}", txId, messages);
if (messages.isEmpty()) {
- successfulFromPrevious = ImmutableList.of();
+ successfulFromPrevious = List.of();
changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
- return;
+ return Optional.empty();
}
final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
}
changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
- processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
+ return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
}
- void preCommit() throws ExecutionException, TimeoutException {
+ Optional<CompletionStage<Empty>> preCommit() {
LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
- Preconditions.checkState(successfulFromPrevious != null);
if (successfulFromPrevious.isEmpty()) {
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
- return;
+ return Optional.empty();
}
final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
new DataTreeCohortActor.PreCommit(txId));
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
- processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
+ return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
}
- void commit() throws ExecutionException, TimeoutException {
+ Optional<CompletionStage<Empty>> commit() {
LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
if (successfulFromPrevious.isEmpty()) {
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
- return;
+ return Optional.empty();
}
- Preconditions.checkState(successfulFromPrevious != null);
final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
new DataTreeCohortActor.Commit(txId));
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
- processResponses(futures, State.COMMIT_SENT, State.COMMITED);
+ return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
}
- Optional<List<Future<Object>>> abort() {
+ Optional<CompletionStage<?>> abort() {
LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
state = State.ABORTED;
- if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) {
+ if (successfulFromPrevious.isEmpty()) {
return Optional.empty();
}
for (Success s : successfulFromPrevious) {
futures.add(Patterns.ask(s.getCohort(), message, timeout));
}
- return Optional.of(futures);
+
+ return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
}
private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
return ret;
}
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void processResponses(final List<Entry<ActorRef, Future<Object>>> futures, final State currentState,
- final State afterState) throws TimeoutException, ExecutionException {
+ private @NonNull CompletionStage<Empty> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
+ final State currentState, final State afterState) {
LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+ final CompletableFuture<Empty> returnFuture = new CompletableFuture<>();
+ Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
+ ExecutionContexts.global());
- final Iterable<Object> results;
- try {
- results = Await.result(Futures.sequence(Lists.transform(futures, e -> e.getValue()),
- ExecutionContexts.global()), timeout.duration());
- } catch (TimeoutException e) {
- successfulFromPrevious = null;
- LOG.debug("{}: processResponses - error from Future", txId, e);
-
- for (Entry<ActorRef, Future<Object>> f : futures) {
- if (!f.getValue().isCompleted()) {
- LOG.info("{}: actor {} failed to respond", txId, f.getKey());
- }
+ aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(final Throwable failure, final Iterable<Object> results) {
+ callbackExecutor.execute(
+ () -> processResponses(failure, results, currentState, afterState, returnFuture));
}
- throw e;
- } catch (ExecutionException e) {
- successfulFromPrevious = null;
- LOG.debug("{}: processResponses - error from Future", txId, e);
- throw e;
- } catch (Exception e) {
- successfulFromPrevious = null;
- LOG.debug("{}: processResponses - error from Future", txId, e);
- throw new ExecutionException(e);
+ }, ExecutionContexts.global());
+
+ return returnFuture;
+ }
+
+ private void processResponses(final Throwable failure, final Iterable<Object> results,
+ final State currentState, final State afterState, final CompletableFuture<Empty> resultFuture) {
+ if (failure != null) {
+ successfulFromPrevious = List.of();
+ resultFuture.completeExceptionally(failure);
+ return;
}
final Collection<Failure> failed = new ArrayList<>(1);
- final List<Success> successful = new ArrayList<>(futures.size());
+ final List<Success> successful = new ArrayList<>();
for (Object result : results) {
if (result instanceof DataTreeCohortActor.Success) {
successful.add((Success) result);
} else if (result instanceof Status.Failure) {
failed.add((Failure) result);
} else {
- LOG.warn("{}: unrecognized response {}, ignoring it", result);
+ LOG.warn("{}: unrecognized response {}, ignoring it", txId, result);
}
}
LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
- successfulFromPrevious = successful;
if (!failed.isEmpty()) {
changeStateFrom(currentState, State.FAILED);
final Iterator<Failure> it = failed.iterator();
while (it.hasNext()) {
firstEx.addSuppressed(it.next().cause());
}
- Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class);
- Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class);
- throw new ExecutionException(firstEx);
+
+ successfulFromPrevious = List.of();
+ resultFuture.completeExceptionally(firstEx);
+ } else {
+ successfulFromPrevious = successful;
+ changeStateFrom(currentState, afterState);
+ resultFuture.complete(Empty.value());
}
- changeStateFrom(currentState, afterState);
}
void changeStateFrom(final State expected, final State followup) {
- Preconditions.checkState(state == expected);
+ checkState(state == expected);
state = followup;
}
}