BUG-7033: Allow ShardDataTree to pipeline transactions 75/28775/21
authorRobert Varga <rovarga@cisco.com>
Sat, 24 Oct 2015 16:34:53 +0000 (18:34 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 15 Dec 2016 18:53:45 +0000 (18:53 +0000)
InMemoryDataTree gives us the DataTreeTip, which allows another
DataTreeModificatoin to be prepared on top an uncommitted one. This
allows us to pipeline transactions if we manage our lifecycle properly.

For now this feature gives stricter abort validation, and has otherwise
no impact. In future, this allows another DataTreeModification to be
prepared and queue for persistence before replication of the previous
one finishes.

Change-Id: I7ab97c906a6403da780800edd335f74c403e5aa4
Signed-off-by: Robert Varga <rovarga@cisco.com>
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java

index bc9549a64071649daf33ee06e66a2a78dcd0511e..a04bf62a96d9e964208e0ac76cd29f104df649a6 100644 (file)
@@ -64,9 +64,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -108,6 +110,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private final Shard shard;
     private Runnable runOnPendingTransactionsComplete;
 
     private final Shard shard;
     private Runnable runOnPendingTransactionsComplete;
 
+    /**
+     * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a
+     * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
+     * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current
+     * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself.
+     */
+    private TipProducingDataTreeTip tip;
+
     private SchemaContext schemaContext;
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
     private SchemaContext schemaContext;
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
@@ -122,6 +132,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
         this.logContext = Preconditions.checkNotNull(logContext);
         this.metadata = ImmutableList.copyOf(metadata);
         this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
         this.logContext = Preconditions.checkNotNull(logContext);
         this.metadata = ImmutableList.copyOf(metadata);
+        tip = dataTree;
     }
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
     }
 
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
@@ -505,6 +516,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     @Deprecated
     public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
     @VisibleForTesting
     @Deprecated
     public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
+        // Direct modification commit is a utility, which cannot be used while we have transactions in-flight
+        Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending");
+
         modification.ready();
         dataTree.validate(modification);
         DataTreeCandidate candidate = dataTree.prepare(modification);
         modification.ready();
         dataTree.validate(modification);
         DataTreeCandidate candidate = dataTree.prepare(modification);
@@ -519,6 +533,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
 
         pendingTransactions.clear();
         }
 
         pendingTransactions.clear();
+        tip = dataTree;
         return ret;
     }
 
         return ret;
     }
 
@@ -536,7 +551,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
-                dataTree.validate(modification);
+                tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
                 entry.lastAccess = shard.ticker().read();
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
                 entry.lastAccess = shard.ticker().read();
@@ -591,7 +606,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
         final DataTreeCandidateTip candidate;
         try {
         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
         final DataTreeCandidateTip candidate;
         try {
-            candidate = dataTree.prepare(cohort.getDataTreeModification());
+            candidate = tip.prepare(cohort.getDataTreeModification());
         } catch (Exception e) {
             failPreCommit(e);
             return;
         } catch (Exception e) {
             failPreCommit(e);
             return;
@@ -604,6 +619,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
             return;
         }
 
+        // Set the tip of the data tree.
+        tip = Verify.verifyNotNull(candidate);
+
         entry.lastAccess = shard.ticker().read();
         cohort.successfulPreCommit(candidate);
     }
         entry.lastAccess = shard.ticker().read();
         cohort.successfulPreCommit(candidate);
     }
@@ -629,6 +647,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
             return;
         }
 
+        // All pending candidates have been committed, reset the tip to the data tree
+        if (tip == candidate) {
+            tip = dataTree;
+        }
+
         shard.getShardMBean().incrementCommittedTransactionCount();
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
         shard.getShardMBean().incrementCommittedTransactionCount();
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
@@ -745,39 +768,77 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
         }
     }
 
-    void startAbort(final SimpleShardDataTreeCohort cohort) {
+    boolean startAbort(final SimpleShardDataTreeCohort cohort) {
         final Iterator<CommitEntry> it = pendingTransactions.iterator();
         if (!it.hasNext()) {
             LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
         final Iterator<CommitEntry> it = pendingTransactions.iterator();
         if (!it.hasNext()) {
             LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
-            return;
+            return true;
         }
 
         // First entry is special, as it may already be committing
         final CommitEntry first = it.next();
         if (cohort.equals(first.cohort)) {
             if (cohort.getState() != State.COMMIT_PENDING) {
         }
 
         // First entry is special, as it may already be committing
         final CommitEntry first = it.next();
         if (cohort.equals(first.cohort)) {
             if (cohort.getState() != State.COMMIT_PENDING) {
-                LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+                LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
                     cohort.getIdentifier());
 
                     cohort.getIdentifier());
 
-                pendingTransactions.remove();
+                it.remove();
+                rebasePreCommittedTransactions(it, dataTree);
                 processNextTransaction();
                 processNextTransaction();
-            } else {
-                LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+                return true;
             }
 
             }
 
-            return;
+            LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+            return false;
         }
 
         }
 
+        TipProducingDataTreeTip newTip = dataTree;
         while (it.hasNext()) {
             final CommitEntry e = it.next();
             if (cohort.equals(e.cohort)) {
                 LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
                 it.remove();
         while (it.hasNext()) {
             final CommitEntry e = it.next();
             if (cohort.equals(e.cohort)) {
                 LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
                 it.remove();
-                return;
+                rebasePreCommittedTransactions(it, newTip);
+                return true;
+            } else {
+                newTip = cohort.getCandidate();
             }
         }
 
         LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
             }
         }
 
         LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+        return true;
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void rebasePreCommittedTransactions(Iterator<CommitEntry> iter, TipProducingDataTreeTip newTip) {
+        tip = newTip;
+        while (iter.hasNext()) {
+            final SimpleShardDataTreeCohort cohort = iter.next().cohort;
+            if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
+                LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+
+                try {
+                    tip.validate(cohort.getDataTreeModification());
+                } catch (DataValidationFailedException | RuntimeException e) {
+                    LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+                    cohort.reportFailure(e);
+                }
+            } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
+                LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+
+                try {
+                    tip.validate(cohort.getDataTreeModification());
+                    DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
+                    cohort.userPreCommit(candidate);
+
+                    cohort.setNewCandidate(candidate);
+                    tip = candidate;
+                } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+                    LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+                    cohort.reportFailure(e);
+                }
+            }
+        }
     }
 
     void setRunOnPendingTransactionsComplete(final Runnable operation) {
     }
 
     void setRunOnPendingTransactionsComplete(final Runnable operation) {
index 4df9dea7dbdb4038403f0e0795b797e0bf10e000..6cb9badd8d3cd66a16ea1626f3e9983444484d1f 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
+@VisibleForTesting
 public abstract class ShardDataTreeCohort implements Identifiable<TransactionIdentifier> {
     public enum State {
         READY,
 public abstract class ShardDataTreeCohort implements Identifiable<TransactionIdentifier> {
     public enum State {
         READY,
index 0527d013f246357ba68def42af0b76146b009741..9791a596117ffc085efc89abbb815c16adb78381 100644 (file)
@@ -13,8 +13,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.base.Verify;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -28,7 +26,7 @@ import scala.concurrent.Future;
 
 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
 
 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
-    private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+
     private final DataTreeModification transaction;
     private final ShardDataTree dataTree;
     private final TransactionIdentifier transactionId;
     private final DataTreeModification transaction;
     private final ShardDataTree dataTree;
     private final TransactionIdentifier transactionId;
@@ -58,7 +56,6 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     }
 
     @Override
     }
 
     @Override
-
     DataTreeModification getDataTreeModification() {
         return transaction;
     }
     DataTreeModification getDataTreeModification() {
         return transaction;
     }
@@ -92,21 +89,25 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         }
     }
 
         }
     }
 
-
     @Override
     @Override
-    public void abort(final FutureCallback<Void> callback) {
-        dataTree.startAbort(this);
+    public void abort(final FutureCallback<Void> abortCallback) {
+        if (!dataTree.startAbort(this)) {
+            abortCallback.onSuccess(null);
+            return;
+        }
+
+        candidate = null;
         state = State.ABORTED;
 
         final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
         if (!maybeAborts.isPresent()) {
         state = State.ABORTED;
 
         final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
         if (!maybeAborts.isPresent()) {
-            callback.onSuccess(null);
+            abortCallback.onSuccess(null);
             return;
         }
 
         final Future<Iterable<Object>> aborts = maybeAborts.get();
         if (aborts.isCompleted()) {
             return;
         }
 
         final Future<Iterable<Object>> aborts = maybeAborts.get();
         if (aborts.isCompleted()) {
-            callback.onSuccess(null);
+            abortCallback.onSuccess(null);
             return;
         }
 
             return;
         }
 
@@ -114,9 +115,9 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
             @Override
             public void onComplete(final Throwable failure, final Iterable<Object> objs) {
                 if (failure != null) {
             @Override
             public void onComplete(final Throwable failure, final Iterable<Object> objs) {
                 if (failure != null) {
-                    callback.onFailure(failure);
+                    abortCallback.onFailure(failure);
                 } else {
                 } else {
-                    callback.onSuccess(null);
+                    abortCallback.onSuccess(null);
                 }
             }
         }, ExecutionContexts.global());
                 }
             }
         }, ExecutionContexts.global());
@@ -127,7 +128,12 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         checkState(State.PRE_COMMIT_COMPLETE);
         this.callback = Preconditions.checkNotNull(newCallback);
         state = State.COMMIT_PENDING;
         checkState(State.PRE_COMMIT_COMPLETE);
         this.callback = Preconditions.checkNotNull(newCallback);
         state = State.COMMIT_PENDING;
-        dataTree.startCommit(this, candidate);
+
+        if (nextFailure == null) {
+            dataTree.startCommit(this, candidate);
+        } else {
+            failedCommit(nextFailure);
+        }
     }
 
     private <T> FutureCallback<T> switchState(final State newState) {
     }
 
     private <T> FutureCallback<T> switchState(final State newState) {
@@ -139,6 +145,11 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         return ret;
     }
 
         return ret;
     }
 
+    void setNewCandidate(DataTreeCandidateTip dataTreeCandidate) {
+        checkState(State.PRE_COMMIT_COMPLETE);
+        this.candidate = Verify.verifyNotNull(dataTreeCandidate);
+    }
+
     void successfulCanCommit() {
         switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
     }
     void successfulCanCommit() {
         switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
     }
index 5a096acbb1206075e438a87042bf54877545182a..c38254769043d96d0902cfb27d1d7f4de2e15514 100644 (file)
@@ -42,7 +42,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
@@ -122,7 +121,7 @@ public class ShardDataTreeTest extends AbstractTest {
     public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException {
         final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
 
     public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException {
         final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
 
-        final List<DataTreeCandidateTip> candidates = new ArrayList<>();
+        final List<DataTreeCandidate> candidates = new ArrayList<>();
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
 
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
 
@@ -139,7 +138,7 @@ public class ShardDataTreeTest extends AbstractTest {
     public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException {
         final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
 
     public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException {
         final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
 
-        final List<DataTreeCandidateTip> candidates = new ArrayList<>();
+        final List<DataTreeCandidate> candidates = new ArrayList<>();
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
         candidates.add(addCar(shardDataTree));
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
         candidates.add(addCar(shardDataTree));
@@ -197,8 +196,8 @@ public class ShardDataTreeTest extends AbstractTest {
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    private static void verifyOnDataTreeChanged(DOMDataTreeChangeListener listener,
-            Consumer<DataTreeCandidate> callback) {
+    private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
+            final Consumer<DataTreeCandidate> callback) {
         ArgumentCaptor<Collection> changes = ArgumentCaptor.forClass(Collection.class);
         verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
         for (Collection list : changes.getAllValues()) {
         ArgumentCaptor<Collection> changes = ArgumentCaptor.forClass(Collection.class);
         verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
         for (Collection list : changes.getAllValues()) {
@@ -222,12 +221,12 @@ public class ShardDataTreeTest extends AbstractTest {
         return optional.get();
     }
 
         return optional.get();
     }
 
-    private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree)
+    private static DataTreeCandidate addCar(final ShardDataTree shardDataTree)
             throws ExecutionException, InterruptedException {
         return addCar(shardDataTree, "altima");
     }
 
             throws ExecutionException, InterruptedException {
         return addCar(shardDataTree, "altima");
     }
 
-    private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree, String name)
+    private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name)
             throws ExecutionException, InterruptedException {
         return doTransaction(shardDataTree, snapshot -> {
             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
             throws ExecutionException, InterruptedException {
         return doTransaction(shardDataTree, snapshot -> {
             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
@@ -236,7 +235,7 @@ public class ShardDataTreeTest extends AbstractTest {
         });
     }
 
         });
     }
 
-    private static DataTreeCandidateTip removeCar(final ShardDataTree shardDataTree)
+    private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree)
             throws ExecutionException, InterruptedException {
         return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
     }
             throws ExecutionException, InterruptedException {
         return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
     }
@@ -246,7 +245,7 @@ public class ShardDataTreeTest extends AbstractTest {
         void execute(DataTreeModification snapshot);
     }
 
         void execute(DataTreeModification snapshot);
     }
 
-    private static DataTreeCandidateTip doTransaction(final ShardDataTree shardDataTree,
+    private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
             final DataTreeOperation operation) throws ExecutionException, InterruptedException {
         final ReadWriteShardDataTreeTransaction transaction =
                 shardDataTree.newReadWriteTransaction(nextTransactionId());
             final DataTreeOperation operation) throws ExecutionException, InterruptedException {
         final ReadWriteShardDataTreeTransaction transaction =
                 shardDataTree.newReadWriteTransaction(nextTransactionId());
@@ -256,25 +255,25 @@ public class ShardDataTreeTest extends AbstractTest {
 
         immediateCanCommit(cohort);
         immediatePreCommit(cohort);
 
         immediateCanCommit(cohort);
         immediatePreCommit(cohort);
-        final DataTreeCandidateTip candidate = cohort.getCandidate();
+        final DataTreeCandidate candidate = cohort.getCandidate();
         immediateCommit(cohort);
 
         return candidate;
     }
 
         immediateCommit(cohort);
 
         return candidate;
     }
 
-    private static DataTreeCandidateTip applyCandidates(final ShardDataTree shardDataTree,
-            final List<DataTreeCandidateTip> candidates) throws ExecutionException, InterruptedException {
+    private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
+            final List<DataTreeCandidate> candidates) throws ExecutionException, InterruptedException {
         final ReadWriteShardDataTreeTransaction transaction =
                 shardDataTree.newReadWriteTransaction(nextTransactionId());
         final DataTreeModification snapshot = transaction.getSnapshot();
         final ReadWriteShardDataTreeTransaction transaction =
                 shardDataTree.newReadWriteTransaction(nextTransactionId());
         final DataTreeModification snapshot = transaction.getSnapshot();
-        for (final DataTreeCandidateTip candidateTip : candidates) {
+        for (final DataTreeCandidate candidateTip : candidates) {
             DataTreeCandidates.applyToModification(snapshot, candidateTip);
         }
         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
 
         immediateCanCommit(cohort);
         immediatePreCommit(cohort);
             DataTreeCandidates.applyToModification(snapshot, candidateTip);
         }
         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
 
         immediateCanCommit(cohort);
         immediatePreCommit(cohort);
-        final DataTreeCandidateTip candidate = cohort.getCandidate();
+        final DataTreeCandidate candidate = cohort.getCandidate();
         immediateCommit(cohort);
 
         return candidate;
         immediateCommit(cohort);
 
         return candidate;
index 0a392bfd0058302ea5a484c1815efb1b03b5e396..f6f6c26c1ed59a77eef354ee2832b01649852fcc 100644 (file)
@@ -1345,8 +1345,12 @@ public class ShardTest extends AbstractShardTest {
                 final InOrder inOrder = inOrder(dataTree);
                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
                 final InOrder inOrder = inOrder(dataTree);
                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+
+                // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
+                //        validate performs wrapping and we capture that mock
+                // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+
                 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
                 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
-                inOrder.verify(dataTree).validate(any(DataTreeModification.class));
             }
         };
     }
             }
         };
     }
index 7d2500b7168249685752f68505697e1662ed5c35..5fef104eaa33d116d2a31bf9fbe0f692504a493c 100644 (file)
@@ -230,7 +230,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
 
     @Test
     public void testAbort() throws Exception {
 
     @Test
     public void testAbort() throws Exception {
-        doNothing().when(mockShardDataTree).startAbort(cohort);
+        doReturn(true).when(mockShardDataTree).startAbort(cohort);
 
         abort(cohort).get();
         verify(mockShardDataTree).startAbort(cohort);
 
         abort(cohort).get();
         verify(mockShardDataTree).startAbort(cohort);
@@ -238,7 +238,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
 
     @Test
     public void testAbortWithCohorts() throws Exception {
 
     @Test
     public void testAbortWithCohorts() throws Exception {
-        doNothing().when(mockShardDataTree).startAbort(cohort);
+        doReturn(true).when(mockShardDataTree).startAbort(cohort);
 
         final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
         doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();
 
         final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
         doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();