import akka.actor.Status.Failure;
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
import akka.dispatch.Recover;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
+import scala.compat.java8.FutureConverters;
import scala.concurrent.Future;
/**
private final DataTreeCohortActorRegistry registry;
private final TransactionIdentifier txId;
private final SchemaContext schema;
+ private final Executor callbackExecutor;
private final Timeout timeout;
- private List<Success> successfulFromPrevious;
+ @Nonnull
+ private List<Success> successfulFromPrevious = Collections.emptyList();
private State state = State.IDLE;
CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
- final SchemaContext schema, final Timeout timeout) {
+ final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
this.registry = Preconditions.checkNotNull(registry);
this.txId = Preconditions.checkNotNull(transactionID);
this.schema = Preconditions.checkNotNull(schema);
+ this.callbackExecutor = Preconditions.checkNotNull(callbackExecutor);
this.timeout = Preconditions.checkNotNull(timeout);
}
throw new IllegalStateException("Unhandled state " + state);
}
- successfulFromPrevious = null;
+ successfulFromPrevious = Collections.emptyList();
state = State.IDLE;
}
- void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
+ Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
if (LOG.isTraceEnabled()) {
LOG.trace("{}: canCommit - candidate: {}", txId, tip);
} else {
final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
LOG.debug("{}: canCommit - messages: {}", txId, messages);
if (messages.isEmpty()) {
- successfulFromPrevious = ImmutableList.of();
+ successfulFromPrevious = Collections.emptyList();
changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
- return;
+ return Optional.empty();
}
final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
}
changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
- processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
+ return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
}
- void preCommit() throws ExecutionException, TimeoutException {
+ Optional<CompletionStage<Void>> preCommit() {
LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
- Preconditions.checkState(successfulFromPrevious != null);
if (successfulFromPrevious.isEmpty()) {
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
- return;
+ return Optional.empty();
}
final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
new DataTreeCohortActor.PreCommit(txId));
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
- processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
+ return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
}
- void commit() throws ExecutionException, TimeoutException {
+ Optional<CompletionStage<Void>> commit() {
LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
if (successfulFromPrevious.isEmpty()) {
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
- return;
+ return Optional.empty();
}
- Preconditions.checkState(successfulFromPrevious != null);
final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
new DataTreeCohortActor.Commit(txId));
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
- processResponses(futures, State.COMMIT_SENT, State.COMMITED);
+ return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
}
- Optional<List<Future<Object>>> abort() {
+ Optional<CompletionStage<?>> abort() {
LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
state = State.ABORTED;
- if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) {
+ if (successfulFromPrevious.isEmpty()) {
return Optional.empty();
}
for (Success s : successfulFromPrevious) {
futures.add(Patterns.ask(s.getCohort(), message, timeout));
}
- return Optional.of(futures);
+
+ return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
}
private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
return ret;
}
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void processResponses(final List<Entry<ActorRef, Future<Object>>> futures, final State currentState,
- final State afterState) throws TimeoutException, ExecutionException {
+ @Nonnull
+ private CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
+ final State currentState, final State afterState) {
LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+ final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
+ Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
+ ExecutionContexts.global());
- final Iterable<Object> results;
- try {
- results = Await.result(Futures.sequence(Lists.transform(futures, Entry::getValue),
- ExecutionContexts.global()), timeout.duration());
- } catch (TimeoutException e) {
- successfulFromPrevious = null;
- LOG.debug("{}: processResponses - error from Future", txId, e);
-
- for (Entry<ActorRef, Future<Object>> f : futures) {
- if (!f.getValue().isCompleted()) {
- LOG.info("{}: actor {} failed to respond", txId, f.getKey());
- }
+ aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> results) {
+ callbackExecutor.execute(
+ () -> processResponses(failure, results, currentState, afterState, returnFuture));
}
- throw e;
- } catch (ExecutionException e) {
- successfulFromPrevious = null;
- LOG.debug("{}: processResponses - error from Future", txId, e);
- throw e;
- } catch (Exception e) {
- successfulFromPrevious = null;
- LOG.debug("{}: processResponses - error from Future", txId, e);
- throw new ExecutionException(e);
+ }, ExecutionContexts.global());
+
+ return returnFuture;
+ }
+
+ // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
+ // generic type is Void.
+ @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
+ private void processResponses(Throwable failure, Iterable<Object> results, State currentState, State afterState,
+ CompletableFuture<Void> resultFuture) {
+ if (failure != null) {
+ successfulFromPrevious = Collections.emptyList();
+ resultFuture.completeExceptionally(failure);
+ return;
}
final Collection<Failure> failed = new ArrayList<>(1);
- final List<Success> successful = new ArrayList<>(futures.size());
+ final List<Success> successful = new ArrayList<>();
for (Object result : results) {
if (result instanceof DataTreeCohortActor.Success) {
successful.add((Success) result);
LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
- successfulFromPrevious = successful;
if (!failed.isEmpty()) {
changeStateFrom(currentState, State.FAILED);
final Iterator<Failure> it = failed.iterator();
while (it.hasNext()) {
firstEx.addSuppressed(it.next().cause());
}
- Throwables.throwIfInstanceOf(firstEx, ExecutionException.class);
- Throwables.throwIfInstanceOf(firstEx, TimeoutException.class);
- throw new ExecutionException(firstEx);
+
+ successfulFromPrevious = Collections.emptyList();
+ resultFuture.completeExceptionally(firstEx);
+ } else {
+ successfulFromPrevious = successful;
+ changeStateFrom(currentState, afterState);
+ resultFuture.complete(null);
}
- changeStateFrom(currentState, afterState);
}
void changeStateFrom(final State expected, final State followup) {
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
}
CompositeDataTreeCohort createCohort(final SchemaContext schemaContext, final TransactionIdentifier txId,
- final Timeout commitStepTimeout) {
- return new CompositeDataTreeCohort(this, txId, schemaContext, commitStepTimeout);
+ final Executor callbackExecutor, final Timeout commitStepTimeout) {
+ return new CompositeDataTreeCohort(this, txId, schemaContext, callbackExecutor, commitStepTimeout);
}
}
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Iterables;
import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
processNextPendingTransaction();
}
- private void failPreCommit(final Exception cause) {
+ private void failPreCommit(final Throwable cause) {
shard.getShardMBean().incrementFailedTransactionsCount();
pendingTransactions.poll().cohort.failedPreCommit(cause);
processNextPendingTransaction();
final DataTreeCandidateTip candidate;
try {
candidate = tip.prepare(cohort.getDataTreeModification());
- cohort.userPreCommit(candidate);
- } catch (ExecutionException | TimeoutException | RuntimeException e) {
+ } catch (RuntimeException e) {
failPreCommit(e);
return;
}
- // Set the tip of the data tree.
- tip = Verify.verifyNotNull(candidate);
+ cohort.userPreCommit(candidate, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void noop) {
+ // Set the tip of the data tree.
+ tip = Verify.verifyNotNull(candidate);
- entry.lastAccess = readTime();
+ entry.lastAccess = readTime();
- pendingTransactions.remove();
- pendingCommits.add(entry);
+ pendingTransactions.remove();
+ pendingCommits.add(entry);
- LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+ LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
- cohort.successfulPreCommit(candidate);
+ cohort.successfulPreCommit(candidate);
- processNextPendingTransaction();
+ processNextPendingTransaction();
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ failPreCommit(failure);
+ }
+ });
}
private void failCommit(final Exception cause) {
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
// FIXME: propagate journal index
- pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+ pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> {
+ LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+ notifyListeners(candidate);
- LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
- notifyListeners(candidate);
-
- processNextPending();
+ processNextPending();
+ });
}
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
@Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
- cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+ cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
+ COMMIT_STEP_TIMEOUT));
pendingTransactions.add(new CommitEntry(cohort, readTime()));
return cohort;
}
try {
tip.validate(cohort.getDataTreeModification());
DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
- cohort.userPreCommit(candidate);
cohort.setNewCandidate(candidate);
tip = candidate;
- } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+ } catch (RuntimeException | DataValidationFailedException e) {
LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
cohort.reportFailure(e);
}
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.Futures;
-import akka.dispatch.OnComplete;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
-import java.util.List;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletionStage;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
candidate = null;
state = State.ABORTED;
- final Optional<List<Future<Object>>> maybeAborts = userCohorts.abort();
+ final Optional<CompletionStage<?>> maybeAborts = userCohorts.abort();
if (!maybeAborts.isPresent()) {
abortCallback.onSuccess(null);
return;
}
- final Future<Iterable<Object>> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global());
- if (aborts.isCompleted()) {
- abortCallback.onSuccess(null);
- return;
- }
-
- aborts.onComplete(new OnComplete<Iterable<Object>>() {
- @Override
- public void onComplete(final Throwable failure, final Iterable<Object> objs) {
- if (failure != null) {
- abortCallback.onFailure(failure);
- } else {
- abortCallback.onSuccess(null);
- }
+ maybeAborts.get().whenComplete((noop, failure) -> {
+ if (failure != null) {
+ abortCallback.onFailure(failure);
+ } else {
+ abortCallback.onSuccess(null);
}
- }, ExecutionContexts.global());
+ });
}
@Override
* any failure to validate is propagated before we record the transaction.
*
* @param dataTreeCandidate {@link DataTreeCandidate} under consideration
- * @throws ExecutionException if the operation fails
- * @throws TimeoutException if the operation times out
+ * @param futureCallback the callback to invoke on completion, which may be immediate or async.
*/
- // FIXME: this should be asynchronous
- void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException {
+ void userPreCommit(final DataTreeCandidate dataTreeCandidate, final FutureCallback<Void> futureCallback) {
userCohorts.reset();
- userCohorts.canCommit(dataTreeCandidate);
- userCohorts.preCommit();
+
+ final Optional<CompletionStage<Void>> maybeCanCommitFuture = userCohorts.canCommit(dataTreeCandidate);
+ if (!maybeCanCommitFuture.isPresent()) {
+ doUserPreCommit(futureCallback);
+ return;
+ }
+
+ maybeCanCommitFuture.get().whenComplete((noop, failure) -> {
+ if (failure != null) {
+ futureCallback.onFailure(failure);
+ } else {
+ doUserPreCommit(futureCallback);
+ }
+ });
+ }
+
+ private void doUserPreCommit(final FutureCallback<Void> futureCallback) {
+ final Optional<CompletionStage<Void>> maybePreCommitFuture = userCohorts.preCommit();
+ if (!maybePreCommitFuture.isPresent()) {
+ futureCallback.onSuccess(null);
+ return;
+ }
+
+ maybePreCommitFuture.get().whenComplete((noop, failure) -> {
+ if (failure != null) {
+ futureCallback.onFailure(failure);
+ } else {
+ futureCallback.onSuccess(null);
+ }
+ });
}
void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
}
- void failedPreCommit(final Exception cause) {
+ void failedPreCommit(final Throwable cause) {
if (LOG.isTraceEnabled()) {
LOG.trace("Transaction {} failed to prepare", transaction, cause);
} else {
switchState(State.FAILED).onFailure(cause);
}
- void successfulCommit(final UnsignedLong journalIndex) {
- try {
- userCohorts.commit();
- } catch (TimeoutException | ExecutionException e) {
- // We are probably dead, depending on what the cohorts end up doing
- LOG.error("User cohorts failed to commit", e);
+ void successfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
+ final Optional<CompletionStage<Void>> maybeCommitFuture = userCohorts.commit();
+ if (!maybeCommitFuture.isPresent()) {
+ finishSuccessfulCommit(journalIndex, onComplete);
+ return;
}
+ maybeCommitFuture.get().whenComplete((noop, failure) -> {
+ if (failure != null) {
+ LOG.error("User cohorts failed to commit", failure);
+ }
+
+ finishSuccessfulCommit(journalIndex, onComplete);
+ });
+ }
+
+ private void finishSuccessfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
switchState(State.COMMITTED).onSuccess(journalIndex);
+ onComplete.run();
}
void failedCommit(final Exception cause) {
import static org.junit.Assert.assertSame;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
-import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import scala.concurrent.Promise;
/**
* Unit tests for SimpleShardDataTreeCohort.
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
- doNothing().when(mockUserCohorts).commit();
+ doReturn(Optional.empty()).when(mockUserCohorts).commit();
doReturn(Optional.empty()).when(mockUserCohorts).abort();
cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(),
final DataTreeCandidateTip candidate = preCommitSuccess();
doAnswer(invocation -> {
- invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCommit(UnsignedLong.valueOf(0));
+ invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCommit(UnsignedLong.valueOf(0),
+ () -> { });
return null;
}).when(mockShardDataTree).startCommit(cohort, candidate);
public void testAbortWithCohorts() throws Exception {
doReturn(true).when(mockShardDataTree).startAbort(cohort);
- final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
- doReturn(Optional.of(Collections.singletonList(cohortFuture.future()))).when(mockUserCohorts).abort();
+ doReturn(Optional.of(CompletableFuture.completedFuture(null))).when(mockUserCohorts).abort();
final Future<?> abortFuture = abort(cohort);
- cohortFuture.success(Collections.emptyList());
-
abortFuture.get();
verify(mockShardDataTree).startAbort(cohort);
}