From: Tom Pantelis Date: Fri, 5 Jan 2018 01:04:41 +0000 (-0500) Subject: CONTROLLER-1641: Handle commit cohorts async X-Git-Tag: release/oxygen~13 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f81bccec7ac422dbcfdfba70dcfa22f9824b8e4c CONTROLLER-1641: Handle commit cohorts async Modified CompositeDataTreeCohort to handle response messages from the DataTreeCohortActor async. The canCommit, preCommit, and commit methods now return an Optional CompletionStage that is present when there are commit cohorts. On completion of the cohort actor Futures, a Runnable is executed in the shard actor to process the responses and complete the returned CompletionStage. The call sites in SimpleShardDataTreeCohort execute their own callbacks on completion. Change-Id: Ib1a22e059dabfe87a1ee141baae655601f9ce700 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java index c4afcfd4d8..0ef49b6244 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java @@ -13,22 +13,25 @@ import akka.actor.Status; import akka.actor.Status.Failure; import akka.dispatch.ExecutionContexts; import akka.dispatch.Futures; +import akka.dispatch.OnComplete; import akka.dispatch.Recover; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success; @@ -36,7 +39,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Await; +import scala.compat.java8.FutureConverters; import scala.concurrent.Future; /** @@ -98,16 +101,19 @@ class CompositeDataTreeCohort { private final DataTreeCohortActorRegistry registry; private final TransactionIdentifier txId; private final SchemaContext schema; + private final Executor callbackExecutor; private final Timeout timeout; - private List successfulFromPrevious; + @Nonnull + private List successfulFromPrevious = Collections.emptyList(); private State state = State.IDLE; CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID, - final SchemaContext schema, final Timeout timeout) { + final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) { this.registry = Preconditions.checkNotNull(registry); this.txId = Preconditions.checkNotNull(transactionID); this.schema = Preconditions.checkNotNull(schema); + this.callbackExecutor = Preconditions.checkNotNull(callbackExecutor); this.timeout = Preconditions.checkNotNull(timeout); } @@ -129,11 +135,11 @@ class CompositeDataTreeCohort { throw new IllegalStateException("Unhandled state " + state); } - successfulFromPrevious = null; + successfulFromPrevious = Collections.emptyList(); state = State.IDLE; } - void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException { + Optional> canCommit(final DataTreeCandidate tip) { if (LOG.isTraceEnabled()) { LOG.trace("{}: canCommit - candidate: {}", txId, tip); } else { @@ -143,9 +149,9 @@ class CompositeDataTreeCohort { final List messages = registry.createCanCommitMessages(txId, tip, schema); LOG.debug("{}: canCommit - messages: {}", txId, messages); if (messages.isEmpty()) { - successfulFromPrevious = ImmutableList.of(); + successfulFromPrevious = Collections.emptyList(); changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL); - return; + return Optional.empty(); } final List>> futures = new ArrayList<>(messages.size()); @@ -158,43 +164,41 @@ class CompositeDataTreeCohort { } changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT); - processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL); + return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL)); } - void preCommit() throws ExecutionException, TimeoutException { + Optional> preCommit() { LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious); - Preconditions.checkState(successfulFromPrevious != null); if (successfulFromPrevious.isEmpty()) { changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL); - return; + return Optional.empty(); } final List>> futures = sendMessageToSuccessful( new DataTreeCohortActor.PreCommit(txId)); changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT); - processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL); + return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL)); } - void commit() throws ExecutionException, TimeoutException { + Optional> commit() { LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious); if (successfulFromPrevious.isEmpty()) { changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED); - return; + return Optional.empty(); } - Preconditions.checkState(successfulFromPrevious != null); final List>> futures = sendMessageToSuccessful( new DataTreeCohortActor.Commit(txId)); changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT); - processResponses(futures, State.COMMIT_SENT, State.COMMITED); + return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED)); } - Optional>> abort() { + Optional> abort() { LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious); state = State.ABORTED; - if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) { + if (successfulFromPrevious.isEmpty()) { return Optional.empty(); } @@ -203,7 +207,8 @@ class CompositeDataTreeCohort { for (Success s : successfulFromPrevious) { futures.add(Patterns.ask(s.getCohort(), message, timeout)); } - return Optional.of(futures); + + return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global()))); } private List>> sendMessageToSuccessful(final Object message) { @@ -217,37 +222,38 @@ class CompositeDataTreeCohort { return ret; } - @SuppressWarnings("checkstyle:IllegalCatch") - private void processResponses(final List>> futures, final State currentState, - final State afterState) throws TimeoutException, ExecutionException { + @Nonnull + private CompletionStage processResponses(final List>> futures, + final State currentState, final State afterState) { LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState); + final CompletableFuture returnFuture = new CompletableFuture<>(); + Future> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue), + ExecutionContexts.global()); - final Iterable results; - try { - results = Await.result(Futures.sequence(Lists.transform(futures, Entry::getValue), - ExecutionContexts.global()), timeout.duration()); - } catch (TimeoutException e) { - successfulFromPrevious = null; - LOG.debug("{}: processResponses - error from Future", txId, e); - - for (Entry> f : futures) { - if (!f.getValue().isCompleted()) { - LOG.info("{}: actor {} failed to respond", txId, f.getKey()); - } + aggregateFuture.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable results) { + callbackExecutor.execute( + () -> processResponses(failure, results, currentState, afterState, returnFuture)); } - throw e; - } catch (ExecutionException e) { - successfulFromPrevious = null; - LOG.debug("{}: processResponses - error from Future", txId, e); - throw e; - } catch (Exception e) { - successfulFromPrevious = null; - LOG.debug("{}: processResponses - error from Future", txId, e); - throw new ExecutionException(e); + }, ExecutionContexts.global()); + + return returnFuture; + } + + // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the + // generic type is Void. + @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION") + private void processResponses(Throwable failure, Iterable results, State currentState, State afterState, + CompletableFuture resultFuture) { + if (failure != null) { + successfulFromPrevious = Collections.emptyList(); + resultFuture.completeExceptionally(failure); + return; } final Collection failed = new ArrayList<>(1); - final List successful = new ArrayList<>(futures.size()); + final List successful = new ArrayList<>(); for (Object result : results) { if (result instanceof DataTreeCohortActor.Success) { successful.add((Success) result); @@ -260,7 +266,6 @@ class CompositeDataTreeCohort { LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed); - successfulFromPrevious = successful; if (!failed.isEmpty()) { changeStateFrom(currentState, State.FAILED); final Iterator it = failed.iterator(); @@ -268,11 +273,14 @@ class CompositeDataTreeCohort { while (it.hasNext()) { firstEx.addSuppressed(it.next().cause()); } - Throwables.throwIfInstanceOf(firstEx, ExecutionException.class); - Throwables.throwIfInstanceOf(firstEx, TimeoutException.class); - throw new ExecutionException(firstEx); + + successfulFromPrevious = Collections.emptyList(); + resultFuture.completeExceptionally(firstEx); + } else { + successfulFromPrevious = successful; + changeStateFrom(currentState, afterState); + resultFuture.complete(null); } - changeStateFrom(currentState, afterState); } void changeStateFrom(final State expected, final State followup) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java index 572abeec7d..1dd0f2856b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; @@ -219,7 +220,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree { } CompositeDataTreeCohort createCohort(final SchemaContext schemaContext, final TransactionIdentifier txId, - final Timeout commitStepTimeout) { - return new CompositeDataTreeCohort(this, txId, schemaContext, commitStepTimeout); + final Executor callbackExecutor, final Timeout commitStepTimeout) { + return new CompositeDataTreeCohort(this, txId, schemaContext, callbackExecutor, commitStepTimeout); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 9aca517bb1..a155e5e0e0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Iterables; import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; @@ -31,7 +32,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -808,7 +808,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingTransaction(); } - private void failPreCommit(final Exception cause) { + private void failPreCommit(final Throwable cause) { shard.getShardMBean().incrementFailedTransactionsCount(); pendingTransactions.poll().cohort.failedPreCommit(cause); processNextPendingTransaction(); @@ -827,25 +827,34 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeCandidateTip candidate; try { candidate = tip.prepare(cohort.getDataTreeModification()); - cohort.userPreCommit(candidate); - } catch (ExecutionException | TimeoutException | RuntimeException e) { + } catch (RuntimeException e) { failPreCommit(e); return; } - // Set the tip of the data tree. - tip = Verify.verifyNotNull(candidate); + cohort.userPreCommit(candidate, new FutureCallback() { + @Override + public void onSuccess(final Void noop) { + // Set the tip of the data tree. + tip = Verify.verifyNotNull(candidate); - entry.lastAccess = readTime(); + entry.lastAccess = readTime(); - pendingTransactions.remove(); - pendingCommits.add(entry); + pendingTransactions.remove(); + pendingCommits.add(entry); - LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); + LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); - cohort.successfulPreCommit(candidate); + cohort.successfulPreCommit(candidate); - processNextPendingTransaction(); + processNextPendingTransaction(); + } + + @Override + public void onFailure(final Throwable failure) { + failPreCommit(failure); + } + }); } private void failCommit(final Exception cause) { @@ -878,12 +887,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); // FIXME: propagate journal index - pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO); + pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> { + LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); + notifyListeners(candidate); - LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); - notifyListeners(candidate); - - processNextPending(); + processNextPending(); + }); } void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { @@ -958,7 +967,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, - cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); + cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable), + COMMIT_STEP_TIMEOUT)); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } @@ -1142,11 +1152,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { try { tip.validate(cohort.getDataTreeModification()); DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification()); - cohort.userPreCommit(candidate); cohort.setNewCandidate(candidate); tip = candidate; - } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) { + } catch (RuntimeException | DataValidationFailedException e) { LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e); cohort.reportFailure(e); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java index cf9948cf03..3104cb4972 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -7,25 +7,19 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.dispatch.ExecutionContexts; -import akka.dispatch.Futures; -import akka.dispatch.OnComplete; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletionStage; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Future; final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class); @@ -116,28 +110,19 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { candidate = null; state = State.ABORTED; - final Optional>> maybeAborts = userCohorts.abort(); + final Optional> maybeAborts = userCohorts.abort(); if (!maybeAborts.isPresent()) { abortCallback.onSuccess(null); return; } - final Future> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global()); - if (aborts.isCompleted()) { - abortCallback.onSuccess(null); - return; - } - - aborts.onComplete(new OnComplete>() { - @Override - public void onComplete(final Throwable failure, final Iterable objs) { - if (failure != null) { - abortCallback.onFailure(failure); - } else { - abortCallback.onSuccess(null); - } + maybeAborts.get().whenComplete((noop, failure) -> { + if (failure != null) { + abortCallback.onFailure(failure); + } else { + abortCallback.onSuccess(null); } - }, ExecutionContexts.global()); + }); } @Override @@ -180,14 +165,40 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { * any failure to validate is propagated before we record the transaction. * * @param dataTreeCandidate {@link DataTreeCandidate} under consideration - * @throws ExecutionException if the operation fails - * @throws TimeoutException if the operation times out + * @param futureCallback the callback to invoke on completion, which may be immediate or async. */ - // FIXME: this should be asynchronous - void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException { + void userPreCommit(final DataTreeCandidate dataTreeCandidate, final FutureCallback futureCallback) { userCohorts.reset(); - userCohorts.canCommit(dataTreeCandidate); - userCohorts.preCommit(); + + final Optional> maybeCanCommitFuture = userCohorts.canCommit(dataTreeCandidate); + if (!maybeCanCommitFuture.isPresent()) { + doUserPreCommit(futureCallback); + return; + } + + maybeCanCommitFuture.get().whenComplete((noop, failure) -> { + if (failure != null) { + futureCallback.onFailure(failure); + } else { + doUserPreCommit(futureCallback); + } + }); + } + + private void doUserPreCommit(final FutureCallback futureCallback) { + final Optional> maybePreCommitFuture = userCohorts.preCommit(); + if (!maybePreCommitFuture.isPresent()) { + futureCallback.onSuccess(null); + return; + } + + maybePreCommitFuture.get().whenComplete((noop, failure) -> { + if (failure != null) { + futureCallback.onFailure(failure); + } else { + futureCallback.onSuccess(null); + } + }); } void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) { @@ -196,7 +207,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate); } - void failedPreCommit(final Exception cause) { + void failedPreCommit(final Throwable cause) { if (LOG.isTraceEnabled()) { LOG.trace("Transaction {} failed to prepare", transaction, cause); } else { @@ -207,15 +218,25 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { switchState(State.FAILED).onFailure(cause); } - void successfulCommit(final UnsignedLong journalIndex) { - try { - userCohorts.commit(); - } catch (TimeoutException | ExecutionException e) { - // We are probably dead, depending on what the cohorts end up doing - LOG.error("User cohorts failed to commit", e); + void successfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) { + final Optional> maybeCommitFuture = userCohorts.commit(); + if (!maybeCommitFuture.isPresent()) { + finishSuccessfulCommit(journalIndex, onComplete); + return; } + maybeCommitFuture.get().whenComplete((noop, failure) -> { + if (failure != null) { + LOG.error("User cohorts failed to commit", failure); + } + + finishSuccessfulCommit(journalIndex, onComplete); + }); + } + + private void finishSuccessfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) { switchState(State.COMMITTED).onSuccess(journalIndex); + onComplete.run(); } void failedCommit(final Exception cause) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java index 59933f8744..4550894fed 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertSame; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -19,7 +18,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -33,7 +31,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import scala.concurrent.Promise; /** * Unit tests for SimpleShardDataTreeCohort. @@ -59,7 +56,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest { public void setup() throws Exception { MockitoAnnotations.initMocks(this); - doNothing().when(mockUserCohorts).commit(); + doReturn(Optional.empty()).when(mockUserCohorts).commit(); doReturn(Optional.empty()).when(mockUserCohorts).abort(); cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(), @@ -139,7 +136,8 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest { final DataTreeCandidateTip candidate = preCommitSuccess(); doAnswer(invocation -> { - invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCommit(UnsignedLong.valueOf(0)); + invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCommit(UnsignedLong.valueOf(0), + () -> { }); return null; }).when(mockShardDataTree).startCommit(cohort, candidate); @@ -240,13 +238,10 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest { public void testAbortWithCohorts() throws Exception { doReturn(true).when(mockShardDataTree).startAbort(cohort); - final Promise> cohortFuture = akka.dispatch.Futures.promise(); - doReturn(Optional.of(Collections.singletonList(cohortFuture.future()))).when(mockUserCohorts).abort(); + doReturn(Optional.of(CompletableFuture.completedFuture(null))).when(mockUserCohorts).abort(); final Future abortFuture = abort(cohort); - cohortFuture.success(Collections.emptyList()); - abortFuture.get(); verify(mockShardDataTree).startAbort(cohort); }