CONTROLLER-1641: Handle commit cohorts async 85/66885/6
authorTom Pantelis <tompantelis@gmail.com>
Fri, 5 Jan 2018 01:04:41 +0000 (20:04 -0500)
committerRobert Varga <nite@hq.sk>
Mon, 5 Feb 2018 00:21:11 +0000 (00:21 +0000)
Modified CompositeDataTreeCohort to handle response messages from
the DataTreeCohortActor async. The canCommit, preCommit, and commit
methods now return an Optional CompletionStage that is present when
there are commit cohorts. On completion of the cohort actor Futures, a
Runnable is executed in the shard actor to process the responses and
complete the returned CompletionStage. The call sites in
SimpleShardDataTreeCohort execute their own callbacks on completion.

Change-Id: Ib1a22e059dabfe87a1ee141baae655601f9ce700
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java

index c4afcfd4d803cf275894d270c9d2574681558bfe..0ef49b6244c33a8a86fb7adec36106117c1f6175 100644 (file)
@@ -13,22 +13,25 @@ import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.dispatch.ExecutionContexts;
 import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
 import akka.dispatch.Recover;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
@@ -36,7 +39,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
+import scala.compat.java8.FutureConverters;
 import scala.concurrent.Future;
 
 /**
@@ -98,16 +101,19 @@ class CompositeDataTreeCohort {
     private final DataTreeCohortActorRegistry registry;
     private final TransactionIdentifier txId;
     private final SchemaContext schema;
+    private final Executor callbackExecutor;
     private final Timeout timeout;
 
-    private List<Success> successfulFromPrevious;
+    @Nonnull
+    private List<Success> successfulFromPrevious = Collections.emptyList();
     private State state = State.IDLE;
 
     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
-        final SchemaContext schema, final Timeout timeout) {
+        final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
         this.registry = Preconditions.checkNotNull(registry);
         this.txId = Preconditions.checkNotNull(transactionID);
         this.schema = Preconditions.checkNotNull(schema);
+        this.callbackExecutor = Preconditions.checkNotNull(callbackExecutor);
         this.timeout = Preconditions.checkNotNull(timeout);
     }
 
@@ -129,11 +135,11 @@ class CompositeDataTreeCohort {
                 throw new IllegalStateException("Unhandled state " + state);
         }
 
-        successfulFromPrevious = null;
+        successfulFromPrevious = Collections.emptyList();
         state = State.IDLE;
     }
 
-    void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
+    Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("{}: canCommit - candidate: {}", txId, tip);
         } else {
@@ -143,9 +149,9 @@ class CompositeDataTreeCohort {
         final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
         LOG.debug("{}: canCommit - messages: {}", txId, messages);
         if (messages.isEmpty()) {
-            successfulFromPrevious = ImmutableList.of();
+            successfulFromPrevious = Collections.emptyList();
             changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
-            return;
+            return Optional.empty();
         }
 
         final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
@@ -158,43 +164,41 @@ class CompositeDataTreeCohort {
         }
 
         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
-        processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
+        return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
     }
 
-    void preCommit() throws ExecutionException, TimeoutException {
+    Optional<CompletionStage<Void>> preCommit() {
         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
 
-        Preconditions.checkState(successfulFromPrevious != null);
         if (successfulFromPrevious.isEmpty()) {
             changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
-            return;
+            return Optional.empty();
         }
 
         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
             new DataTreeCohortActor.PreCommit(txId));
         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
-        processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
+        return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
     }
 
-    void commit() throws ExecutionException, TimeoutException {
+    Optional<CompletionStage<Void>> commit() {
         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
         if (successfulFromPrevious.isEmpty()) {
             changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
-            return;
+            return Optional.empty();
         }
 
-        Preconditions.checkState(successfulFromPrevious != null);
         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
             new DataTreeCohortActor.Commit(txId));
         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
-        processResponses(futures, State.COMMIT_SENT, State.COMMITED);
+        return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
     }
 
-    Optional<List<Future<Object>>> abort() {
+    Optional<CompletionStage<?>> abort() {
         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
 
         state = State.ABORTED;
-        if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) {
+        if (successfulFromPrevious.isEmpty()) {
             return Optional.empty();
         }
 
@@ -203,7 +207,8 @@ class CompositeDataTreeCohort {
         for (Success s : successfulFromPrevious) {
             futures.add(Patterns.ask(s.getCohort(), message, timeout));
         }
-        return Optional.of(futures);
+
+        return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
     }
 
     private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
@@ -217,37 +222,38 @@ class CompositeDataTreeCohort {
         return ret;
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void processResponses(final List<Entry<ActorRef, Future<Object>>> futures, final State currentState,
-            final State afterState) throws TimeoutException, ExecutionException {
+    @Nonnull
+    private CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
+            final State currentState, final State afterState) {
         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+        final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
+        Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
+                ExecutionContexts.global());
 
-        final Iterable<Object> results;
-        try {
-            results = Await.result(Futures.sequence(Lists.transform(futures, Entry::getValue),
-                ExecutionContexts.global()), timeout.duration());
-        } catch (TimeoutException e) {
-            successfulFromPrevious = null;
-            LOG.debug("{}: processResponses - error from Future", txId, e);
-
-            for (Entry<ActorRef, Future<Object>> f : futures) {
-                if (!f.getValue().isCompleted()) {
-                    LOG.info("{}: actor {} failed to respond", txId, f.getKey());
-                }
+        aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
+            @Override
+            public void onComplete(Throwable failure, Iterable<Object> results) {
+                callbackExecutor.execute(
+                    () -> processResponses(failure, results, currentState, afterState, returnFuture));
             }
-            throw e;
-        } catch (ExecutionException e) {
-            successfulFromPrevious = null;
-            LOG.debug("{}: processResponses - error from Future", txId, e);
-            throw e;
-        } catch (Exception e) {
-            successfulFromPrevious = null;
-            LOG.debug("{}: processResponses - error from Future", txId, e);
-            throw new ExecutionException(e);
+        }, ExecutionContexts.global());
+
+        return returnFuture;
+    }
+
+    // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
+    // generic type is Void.
+    @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
+    private void processResponses(Throwable failure, Iterable<Object> results, State currentState, State afterState,
+            CompletableFuture<Void> resultFuture) {
+        if (failure != null) {
+            successfulFromPrevious = Collections.emptyList();
+            resultFuture.completeExceptionally(failure);
+            return;
         }
 
         final Collection<Failure> failed = new ArrayList<>(1);
-        final List<Success> successful = new ArrayList<>(futures.size());
+        final List<Success> successful = new ArrayList<>();
         for (Object result : results) {
             if (result instanceof DataTreeCohortActor.Success) {
                 successful.add((Success) result);
@@ -260,7 +266,6 @@ class CompositeDataTreeCohort {
 
         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
 
-        successfulFromPrevious = successful;
         if (!failed.isEmpty()) {
             changeStateFrom(currentState, State.FAILED);
             final Iterator<Failure> it = failed.iterator();
@@ -268,11 +273,14 @@ class CompositeDataTreeCohort {
             while (it.hasNext()) {
                 firstEx.addSuppressed(it.next().cause());
             }
-            Throwables.throwIfInstanceOf(firstEx, ExecutionException.class);
-            Throwables.throwIfInstanceOf(firstEx, TimeoutException.class);
-            throw new ExecutionException(firstEx);
+
+            successfulFromPrevious = Collections.emptyList();
+            resultFuture.completeExceptionally(firstEx);
+        } else {
+            successfulFromPrevious = successful;
+            changeStateFrom(currentState, afterState);
+            resultFuture.complete(null);
         }
-        changeStateFrom(currentState, afterState);
     }
 
     void changeStateFrom(final State expected, final State followup) {
index 572abeec7d1bdcf450421a7c4137fc82b60d09f5..1dd0f2856bd98c05fa99389348ec6a57dd9419d3 100644 (file)
@@ -20,6 +20,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
@@ -219,7 +220,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
     }
 
     CompositeDataTreeCohort createCohort(final SchemaContext schemaContext, final TransactionIdentifier txId,
-            final Timeout commitStepTimeout) {
-        return new CompositeDataTreeCohort(this, txId, schemaContext, commitStepTimeout);
+            final Executor callbackExecutor, final Timeout commitStepTimeout) {
+        return new CompositeDataTreeCohort(this, txId, schemaContext, callbackExecutor, commitStepTimeout);
     }
 }
index 9aca517bb110d2f28381167962f2ad8f3de288af..a155e5e0e049ab5afc19e2dc969bbcd775c0a494 100644 (file)
@@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Iterables;
 import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.IOException;
@@ -31,7 +32,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -808,7 +808,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingTransaction();
     }
 
-    private void failPreCommit(final Exception cause) {
+    private void failPreCommit(final Throwable cause) {
         shard.getShardMBean().incrementFailedTransactionsCount();
         pendingTransactions.poll().cohort.failedPreCommit(cause);
         processNextPendingTransaction();
@@ -827,25 +827,34 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final DataTreeCandidateTip candidate;
         try {
             candidate = tip.prepare(cohort.getDataTreeModification());
-            cohort.userPreCommit(candidate);
-        } catch (ExecutionException | TimeoutException | RuntimeException e) {
+        } catch (RuntimeException e) {
             failPreCommit(e);
             return;
         }
 
-        // Set the tip of the data tree.
-        tip = Verify.verifyNotNull(candidate);
+        cohort.userPreCommit(candidate, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void noop) {
+                // Set the tip of the data tree.
+                tip = Verify.verifyNotNull(candidate);
 
-        entry.lastAccess = readTime();
+                entry.lastAccess = readTime();
 
-        pendingTransactions.remove();
-        pendingCommits.add(entry);
+                pendingTransactions.remove();
+                pendingCommits.add(entry);
 
-        LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+                LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
 
-        cohort.successfulPreCommit(candidate);
+                cohort.successfulPreCommit(candidate);
 
-        processNextPendingTransaction();
+                processNextPendingTransaction();
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                failPreCommit(failure);
+            }
+        });
     }
 
     private void failCommit(final Exception cause) {
@@ -878,12 +887,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
         // FIXME: propagate journal index
-        pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+        pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> {
+            LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+            notifyListeners(candidate);
 
-        LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
-        notifyListeners(candidate);
-
-        processNextPending();
+            processNextPending();
+        });
     }
 
     void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
@@ -958,7 +967,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @Override
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
-                cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+                cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
+                        COMMIT_STEP_TIMEOUT));
         pendingTransactions.add(new CommitEntry(cohort, readTime()));
         return cohort;
     }
@@ -1142,11 +1152,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 try {
                     tip.validate(cohort.getDataTreeModification());
                     DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
-                    cohort.userPreCommit(candidate);
 
                     cohort.setNewCandidate(candidate);
                     tip = candidate;
-                } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+                } catch (RuntimeException | DataValidationFailedException e) {
                     LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
                     cohort.reportFailure(e);
                 }
index cf9948cf03e0407f47103e4e2085d33a58d0346d..3104cb4972de5973f270ceb32593fd809bb58088 100644 (file)
@@ -7,25 +7,19 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.Futures;
-import akka.dispatch.OnComplete;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletionStage;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
 
 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
@@ -116,28 +110,19 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         candidate = null;
         state = State.ABORTED;
 
-        final Optional<List<Future<Object>>> maybeAborts = userCohorts.abort();
+        final Optional<CompletionStage<?>> maybeAborts = userCohorts.abort();
         if (!maybeAborts.isPresent()) {
             abortCallback.onSuccess(null);
             return;
         }
 
-        final Future<Iterable<Object>> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global());
-        if (aborts.isCompleted()) {
-            abortCallback.onSuccess(null);
-            return;
-        }
-
-        aborts.onComplete(new OnComplete<Iterable<Object>>() {
-            @Override
-            public void onComplete(final Throwable failure, final Iterable<Object> objs) {
-                if (failure != null) {
-                    abortCallback.onFailure(failure);
-                } else {
-                    abortCallback.onSuccess(null);
-                }
+        maybeAborts.get().whenComplete((noop, failure) -> {
+            if (failure != null) {
+                abortCallback.onFailure(failure);
+            } else {
+                abortCallback.onSuccess(null);
             }
-        }, ExecutionContexts.global());
+        });
     }
 
     @Override
@@ -180,14 +165,40 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
      * any failure to validate is propagated before we record the transaction.
      *
      * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
-     * @throws ExecutionException if the operation fails
-     * @throws TimeoutException if the operation times out
+     * @param futureCallback the callback to invoke on completion, which may be immediate or async.
      */
-    // FIXME: this should be asynchronous
-    void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException {
+    void userPreCommit(final DataTreeCandidate dataTreeCandidate, final FutureCallback<Void> futureCallback) {
         userCohorts.reset();
-        userCohorts.canCommit(dataTreeCandidate);
-        userCohorts.preCommit();
+
+        final Optional<CompletionStage<Void>> maybeCanCommitFuture = userCohorts.canCommit(dataTreeCandidate);
+        if (!maybeCanCommitFuture.isPresent()) {
+            doUserPreCommit(futureCallback);
+            return;
+        }
+
+        maybeCanCommitFuture.get().whenComplete((noop, failure) -> {
+            if (failure != null) {
+                futureCallback.onFailure(failure);
+            } else {
+                doUserPreCommit(futureCallback);
+            }
+        });
+    }
+
+    private void doUserPreCommit(final FutureCallback<Void> futureCallback) {
+        final Optional<CompletionStage<Void>> maybePreCommitFuture = userCohorts.preCommit();
+        if (!maybePreCommitFuture.isPresent()) {
+            futureCallback.onSuccess(null);
+            return;
+        }
+
+        maybePreCommitFuture.get().whenComplete((noop, failure) -> {
+            if (failure != null) {
+                futureCallback.onFailure(failure);
+            } else {
+                futureCallback.onSuccess(null);
+            }
+        });
     }
 
     void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
@@ -196,7 +207,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
     }
 
-    void failedPreCommit(final Exception cause) {
+    void failedPreCommit(final Throwable cause) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Transaction {} failed to prepare", transaction, cause);
         } else {
@@ -207,15 +218,25 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         switchState(State.FAILED).onFailure(cause);
     }
 
-    void successfulCommit(final UnsignedLong journalIndex) {
-        try {
-            userCohorts.commit();
-        } catch (TimeoutException | ExecutionException e) {
-            // We are probably dead, depending on what the cohorts end up doing
-            LOG.error("User cohorts failed to commit", e);
+    void successfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
+        final Optional<CompletionStage<Void>> maybeCommitFuture = userCohorts.commit();
+        if (!maybeCommitFuture.isPresent()) {
+            finishSuccessfulCommit(journalIndex, onComplete);
+            return;
         }
 
+        maybeCommitFuture.get().whenComplete((noop, failure) -> {
+            if (failure != null) {
+                LOG.error("User cohorts failed to commit", failure);
+            }
+
+            finishSuccessfulCommit(journalIndex, onComplete);
+        });
+    }
+
+    private void finishSuccessfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
         switchState(State.COMMITTED).onSuccess(journalIndex);
+        onComplete.run();
     }
 
     void failedCommit(final Exception cause) {
index 59933f874495768b60534d111f750e9ba91ae71b..4550894fed33c07f30d8e7ba6b2a133df387297a 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertSame;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -19,7 +18,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
@@ -33,7 +31,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import scala.concurrent.Promise;
 
 /**
  * Unit tests for SimpleShardDataTreeCohort.
@@ -59,7 +56,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
     public void setup() throws Exception {
         MockitoAnnotations.initMocks(this);
 
-        doNothing().when(mockUserCohorts).commit();
+        doReturn(Optional.empty()).when(mockUserCohorts).commit();
         doReturn(Optional.empty()).when(mockUserCohorts).abort();
 
         cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(),
@@ -139,7 +136,8 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
         final DataTreeCandidateTip candidate = preCommitSuccess();
 
         doAnswer(invocation -> {
-            invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCommit(UnsignedLong.valueOf(0));
+            invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCommit(UnsignedLong.valueOf(0),
+                () -> { });
             return null;
         }).when(mockShardDataTree).startCommit(cohort, candidate);
 
@@ -240,13 +238,10 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
     public void testAbortWithCohorts() throws Exception {
         doReturn(true).when(mockShardDataTree).startAbort(cohort);
 
-        final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
-        doReturn(Optional.of(Collections.singletonList(cohortFuture.future()))).when(mockUserCohorts).abort();
+        doReturn(Optional.of(CompletableFuture.completedFuture(null))).when(mockUserCohorts).abort();
 
         final Future<?> abortFuture = abort(cohort);
 
-        cohortFuture.success(Collections.emptyList());
-
         abortFuture.get();
         verify(mockShardDataTree).startAbort(cohort);
     }