BUG-5280: expand ShardDataTree to cover transaction mechanics 97/42497/27
authorRobert Varga <rovarga@cisco.com>
Mon, 25 Jul 2016 18:59:37 +0000 (20:59 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 5 Aug 2016 13:10:41 +0000 (13:10 +0000)
A chunk of ShardCommitCoordinator should actually be implemented
by ShardDataTree. This includes transaction queueing, commit timers,
interaction with user cohorts and persistence.

This patch implements the relevant operations in an message-agnostic,
callback-driven way.

Fix: ShardDataTreeTest (missing ShardStat MBean)

Change-Id: I353bacce8245df85c5f4d6b4cc0ce5416f2f0337
Signed-off-by: Robert Varga <rovarga@cisco.com>
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
27 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortActorRegistry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/CandidateListChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnerChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipStatisticsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModificationTest.java

index 9c2e91d..401c15b 100644 (file)
@@ -8,9 +8,11 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.slf4j.Logger;
@@ -29,33 +31,36 @@ final class ChainedCommitCohort extends ShardDataTreeCohort {
     }
 
     @Override
-    public ListenableFuture<Void> commit() {
-        final ListenableFuture<Void> ret = delegate.commit();
-
-        Futures.addCallback(ret, new FutureCallback<Void>() {
+    public void commit(final FutureCallback<UnsignedLong> callback) {
+        delegate.commit(new FutureCallback<UnsignedLong>() {
             @Override
-            public void onSuccess(Void result) {
+            public void onSuccess(final UnsignedLong result) {
                 chain.clearTransaction(transaction);
                 LOG.debug("Committed transaction {}", transaction);
+                callback.onSuccess(result);
             }
 
             @Override
-            public void onFailure(Throwable t) {
+            public void onFailure(final Throwable t) {
                 LOG.error("Transaction {} commit failed, cannot recover", transaction, t);
+                callback.onFailure(t);
             }
         });
+    }
 
-        return ret;
+    @Override
+    public TransactionIdentifier getIdentifier() {
+        return delegate.getIdentifier();
     }
 
     @Override
-    public ListenableFuture<Boolean> canCommit() {
-        return delegate.canCommit();
+    public void canCommit(final FutureCallback<Void> callback) {
+        delegate.canCommit(callback);
     }
 
     @Override
-    public ListenableFuture<Void> preCommit() {
-        return delegate.preCommit();
+    public void preCommit(final FutureCallback<DataTreeCandidate> callback) {
+        delegate.preCommit(callback);
     }
 
     @Override
@@ -72,4 +77,14 @@ final class ChainedCommitCohort extends ShardDataTreeCohort {
     DataTreeModification getDataTreeModification() {
         return delegate.getDataTreeModification();
     }
+
+    @Override
+    public boolean isFailed() {
+        return delegate.isFailed();
+    }
+
+    @Override
+    public State getState() {
+        return delegate.getState();
+    }
 }
\ No newline at end of file
index 06d3ec9..767749a 100644 (file)
@@ -8,38 +8,22 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.util.Timeout;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.Duration;
 
 final class CohortEntry {
-    enum State {
-        PENDING,
-        CAN_COMMITTED,
-        PRE_COMMITTED,
-        COMMITTED,
-        ABORTED
-    }
-
-    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
-
-    private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
     private final ReadWriteShardDataTreeTransaction transaction;
     private final TransactionIdentifier transactionID;
-    private final CompositeDataTreeCohort userCohorts;
     private final short clientVersion;
 
-    private State state = State.PENDING;
     private RuntimeException lastBatchedModificationsException;
     private int totalBatchedModificationsReceived;
     private ShardDataTreeCohort cohort;
@@ -47,36 +31,25 @@ final class CohortEntry {
     private ActorRef replySender;
     private Shard shard;
 
-    private CohortEntry(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
-            DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
+    private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
         this.transaction = Preconditions.checkNotNull(transaction);
-        this.transactionID = Preconditions.checkNotNull(transactionID);
+        this.transactionID = transaction.getId();
         this.clientVersion = clientVersion;
-        this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
     }
 
-    private CohortEntry(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
-            SchemaContext schema, short clientVersion) {
-        this.transactionID = Preconditions.checkNotNull(transactionID);
-        this.cohort = cohort;
+    private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) {
+        this.cohort = Preconditions.checkNotNull(cohort);
+        this.transactionID = cohort.getIdentifier();
         this.transaction = null;
         this.clientVersion = clientVersion;
-        this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
-    }
-
-    static CohortEntry createOpen(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
-            DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
-        return new CohortEntry(transactionID, transaction, cohortRegistry, schema, clientVersion);
     }
 
-    static CohortEntry createReady(TransactionIdentifier transactionID, ShardDataTreeCohort cohort,
-            DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
-        return new CohortEntry(transactionID, cohort, cohortRegistry, schema, clientVersion);
+    static CohortEntry createOpen(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
+        return new CohortEntry(transaction, clientVersion);
     }
 
-    void updateLastAccessTime() {
-        lastAccessTimer.reset();
-        lastAccessTimer.start();
+    static CohortEntry createReady(final ShardDataTreeCohort cohort, final short clientVersion) {
+        return new CohortEntry(cohort, clientVersion);
     }
 
     TransactionIdentifier getTransactionID() {
@@ -87,12 +60,8 @@ final class CohortEntry {
         return clientVersion;
     }
 
-    State getState() {
-        return state;
-    }
-
-    DataTreeCandidate getCandidate() {
-        return cohort.getCandidate();
+    boolean isFailed() {
+        return cohort != null && cohort.isFailed();
     }
 
     DataTreeModification getDataTreeModification() {
@@ -111,7 +80,7 @@ final class CohortEntry {
         return lastBatchedModificationsException;
     }
 
-    void applyModifications(Iterable<Modification> modifications) {
+    void applyModifications(final Iterable<Modification> modifications) {
         totalBatchedModificationsReceived++;
         if(lastBatchedModificationsException == null) {
             for (Modification modification : modifications) {
@@ -125,43 +94,25 @@ final class CohortEntry {
         }
     }
 
-    boolean canCommit() throws InterruptedException, ExecutionException {
-        state = State.CAN_COMMITTED;
-
-        // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
-        // about possibly accessing our state on a different thread outside of our dispatcher.
-        // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
-        // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
-        // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
-        return cohort.canCommit().get();
+    void canCommit(final FutureCallback<Void> callback) {
+        cohort.canCommit(callback);
     }
 
-
-
-    void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
-        state = State.PRE_COMMITTED;
-        cohort.preCommit().get();
-        userCohorts.canCommit(cohort.getCandidate());
-        userCohorts.preCommit();
+    void preCommit(final FutureCallback<DataTreeCandidate> callback) {
+        cohort.preCommit(callback);
     }
 
-    void commit() throws InterruptedException, ExecutionException, TimeoutException {
-        state = State.COMMITTED;
-        cohort.commit().get();
-        userCohorts.commit();
+    void commit(final FutureCallback<UnsignedLong> callback) {
+        cohort.commit(callback);
     }
 
     void abort() throws InterruptedException, ExecutionException, TimeoutException {
-        state = State.ABORTED;
         cohort.abort().get();
-        userCohorts.abort();
     }
 
-    void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
+    void ready(final CohortDecorator cohortDecorator) {
         Preconditions.checkState(cohort == null, "cohort was already set");
 
-        setDoImmediateCommit(doImmediateCommit);
-
         cohort = transaction.ready();
 
         if(cohortDecorator != null) {
@@ -170,19 +121,11 @@ final class CohortEntry {
         }
     }
 
-    boolean isReadyToCommit() {
-        return replySender != null;
-    }
-
-    boolean isExpired(long expireTimeInMillis) {
-        return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
-    }
-
     boolean isDoImmediateCommit() {
         return doImmediateCommit;
     }
 
-    void setDoImmediateCommit(boolean doImmediateCommit) {
+    void setDoImmediateCommit(final boolean doImmediateCommit) {
         this.doImmediateCommit = doImmediateCommit;
     }
 
@@ -190,7 +133,7 @@ final class CohortEntry {
         return replySender;
     }
 
-    void setReplySender(ActorRef replySender) {
+    void setReplySender(final ActorRef replySender) {
         this.replySender = replySender;
     }
 
@@ -198,15 +141,10 @@ final class CohortEntry {
         return shard;
     }
 
-    void setShard(Shard shard) {
+    void setShard(final Shard shard) {
         this.shard = shard;
     }
 
-
-    boolean isAborted() {
-        return state == State.ABORTED;
-    }
-
     @Override
     public String toString() {
         final StringBuilder builder = new StringBuilder();
index d833962..8115473 100644 (file)
@@ -21,12 +21,13 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -85,7 +86,7 @@ class CompositeDataTreeCohort {
 
     protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
         @Override
-        public Failure recover(Throwable error) throws Throwable {
+        public Failure recover(final Throwable error) throws Throwable {
             return new Failure(error);
         }
     };
@@ -98,21 +99,21 @@ class CompositeDataTreeCohort {
     private Iterable<Success> successfulFromPrevious;
     private State state = State.IDLE;
 
-    CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, TransactionIdentifier transactionID,
-        SchemaContext schema, Timeout timeout) {
+    CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
+        final SchemaContext schema, final Timeout timeout) {
         this.registry = Preconditions.checkNotNull(registry);
         this.txId = Preconditions.checkNotNull(transactionID);
         this.schema = Preconditions.checkNotNull(schema);
         this.timeout = Preconditions.checkNotNull(timeout);
     }
 
-    void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException {
+    void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
         Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
         // FIXME: Optimize empty collection list with pre-created futures, containing success.
         Future<Iterable<Object>> canCommitsFuture =
                 Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
                     @Override
-                    public Future<Object> apply(CanCommit input) {
+                    public Future<Object> apply(final CanCommit input) {
                         return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
                                 ExecutionContexts.global());
                     }
@@ -135,24 +136,26 @@ class CompositeDataTreeCohort {
         processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
     }
 
-    void abort() throws TimeoutException {
+    Optional<Future<Iterable<Object>>> abort() {
         if (successfulFromPrevious != null) {
-            sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId));
+            return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
         }
+
+        return Optional.empty();
     }
 
     private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
         return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
 
             @Override
-            public Future<Object> apply(DataTreeCohortActor.Success cohortResponse) throws Exception {
+            public Future<Object> apply(final DataTreeCohortActor.Success cohortResponse) throws Exception {
                 return Patterns.ask(cohortResponse.getCohort(), message, timeout);
             }
 
         }, ExecutionContexts.global());
     }
 
-    private void processResponses(Future<Iterable<Object>> resultsFuture, State currentState, State afterState)
+    private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState, final State afterState)
             throws TimeoutException, ExecutionException {
         final Iterable<Object> results;
         try {
@@ -179,7 +182,7 @@ class CompositeDataTreeCohort {
         changeStateFrom(currentState, afterState);
     }
 
-    void changeStateFrom(State expected, State followup) {
+    void changeStateFrom(final State expected, final State followup) {
         Preconditions.checkState(state == expected);
         state = followup;
     }
index 28b3c70..fb3743d 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Status;
+import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -48,7 +49,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
     private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
 
 
-    void registerCohort(ActorRef sender, RegisterCohort cohort) {
+    void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
         takeLock();
         try {
             final ActorRef cohortRef = cohort.getCohort();
@@ -65,7 +66,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
         sender.tell(new Status.Success(null), ActorRef.noSender());
     }
 
-    void removeCommitCohort(ActorRef sender, RemoveCohort message) {
+    void removeCommitCohort(final ActorRef sender, final RemoveCohort message) {
         final ActorRef cohort = message.getCohort();
         final RegistrationTreeNode<ActorRef> node = cohortToNode.get(cohort);
         if (node != null) {
@@ -76,14 +77,14 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
         cohort.tell(PoisonPill.getInstance(), cohort);
     }
 
-    Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(TransactionIdentifier txId,
-            DataTreeCandidate candidate, SchemaContext schema) {
+    Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(final TransactionIdentifier txId,
+            final DataTreeCandidate candidate, final SchemaContext schema) {
         try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
             return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
         }
     }
 
-    void process(ActorRef sender, CohortRegistryCommand message) {
+    void process(final ActorRef sender, final CohortRegistryCommand message) {
         if (message instanceof RegisterCohort) {
             registerCohort(sender, (RegisterCohort) message);
         } else if (message instanceof RemoveCohort) {
@@ -95,7 +96,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
 
         private final ActorRef cohort;
 
-        CohortRegistryCommand(ActorRef cohort) {
+        CohortRegistryCommand(final ActorRef cohort) {
             this.cohort = Preconditions.checkNotNull(cohort);
         }
 
@@ -108,7 +109,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
 
         private final DOMDataTreeIdentifier path;
 
-        RegisterCohort(DOMDataTreeIdentifier path, ActorRef cohort) {
+        RegisterCohort(final DOMDataTreeIdentifier path, final ActorRef cohort) {
             super(cohort);
             this.path = path;
 
@@ -122,7 +123,7 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
 
     static class RemoveCohort extends CohortRegistryCommand {
 
-        RemoveCohort(ActorRef cohort) {
+        RemoveCohort(final ActorRef cohort) {
             super(cohort);
         }
 
@@ -136,14 +137,14 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
         private final Collection<DataTreeCohortActor.CanCommit> messages =
                 new ArrayList<>();
 
-        CanCommitMessageBuilder(TransactionIdentifier txId, DataTreeCandidate candidate, SchemaContext schema) {
+        CanCommitMessageBuilder(final TransactionIdentifier txId, final DataTreeCandidate candidate, final SchemaContext schema) {
             this.txId = Preconditions.checkNotNull(txId);
             this.candidate = Preconditions.checkNotNull(candidate);
             this.schema = schema;
         }
 
-        private void lookupAndCreateCanCommits(List<PathArgument> args, int offset,
-                RegistrationTreeNode<ActorRef> node) {
+        private void lookupAndCreateCanCommits(final List<PathArgument> args, final int offset,
+                final RegistrationTreeNode<ActorRef> node) {
 
             if (args.size() != offset) {
                 final PathArgument arg = args.get(offset);
@@ -159,8 +160,8 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
             }
         }
 
-        private void lookupAndCreateCanCommits(YangInstanceIdentifier path, RegistrationTreeNode<ActorRef> regNode,
-                DataTreeCandidateNode candNode) {
+        private void lookupAndCreateCanCommits(final YangInstanceIdentifier path, final RegistrationTreeNode<ActorRef> regNode,
+                final DataTreeCandidateNode candNode) {
             if (candNode.getModificationType() == ModificationType.UNMODIFIED) {
                 LOG.debug("Skipping unmodified candidate {}", path);
                 return;
@@ -186,8 +187,8 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
             }
         }
 
-        private void createCanCommits(Collection<ActorRef> regs, YangInstanceIdentifier path,
-                DataTreeCandidateNode node) {
+        private void createCanCommits(final Collection<ActorRef> regs, final YangInstanceIdentifier path,
+                final DataTreeCandidateNode node) {
             final DOMDataTreeCandidate candidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node);
             for (final ActorRef reg : regs) {
                 final CanCommit message = new DataTreeCohortActor.CanCommit(txId, candidate, schema, reg);
@@ -195,15 +196,19 @@ class DataTreeCohortActorRegistry extends AbstractRegistrationTree<ActorRef> {
             }
         }
 
-        private static DOMDataTreeIdentifier treeIdentifier(YangInstanceIdentifier path) {
+        private static DOMDataTreeIdentifier treeIdentifier(final YangInstanceIdentifier path) {
             return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path);
         }
 
-        private Collection<DataTreeCohortActor.CanCommit> perform(RegistrationTreeNode<ActorRef> rootNode) {
+        private Collection<DataTreeCohortActor.CanCommit> perform(final RegistrationTreeNode<ActorRef> rootNode) {
             final List<PathArgument> toLookup = candidate.getRootPath().getPathArguments();
             lookupAndCreateCanCommits(toLookup, 0, rootNode);
             return messages;
         }
     }
 
+    CompositeDataTreeCohort createCohort(final SchemaContext schemaContext, final TransactionIdentifier txId,
+            final Timeout commitStepTimeout) {
+        return new CompositeDataTreeCohort(this, txId, schemaContext, commitStepTimeout);
+    }
 }
index bd44f0b..3fe349f 100644 (file)
@@ -16,12 +16,11 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
+import com.google.common.base.Ticker;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -39,7 +38,6 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
@@ -52,7 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@ -67,9 +64,8 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
@@ -134,7 +130,7 @@ public class Shard extends RaftActor {
 
     private final ShardTransactionMessageRetrySupport messageRetrySupport;
 
-    protected Shard(AbstractBuilder<?, ?> builder) {
+    protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
@@ -146,9 +142,17 @@ public class Shard extends RaftActor {
 
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
-        store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType(),
-                new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"),
-                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"), name);
+        ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
+                new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
+        ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
+                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
+        if(builder.getDataTree() != null) {
+            store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
+                    treeChangeListenerPublisher, dataChangeListenerPublisher, name);
+        } else {
+            store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
+                    treeChangeListenerPublisher, dataChangeListenerPublisher, name);
+        }
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                 datastoreContext.getDataStoreMXBeanType());
@@ -158,9 +162,7 @@ public class Shard extends RaftActor {
             getContext().become(new MeteringBehavior(this));
         }
 
-        commitCoordinator = new ShardCommitCoordinator(store,
-                datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
-                datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, this.name);
+        commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name);
 
         setTransactionCommitTimeout();
 
@@ -185,7 +187,7 @@ public class Shard extends RaftActor {
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
     }
 
-    private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
+    private Optional<ActorRef> createRoleChangeNotifier(final String shardId) {
         ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
             RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
         return Optional.of(shardRoleChangeNotifier);
@@ -199,7 +201,7 @@ public class Shard extends RaftActor {
 
         messageRetrySupport.close();
 
-        if(txCommitTimeoutCheckSchedule != null) {
+        if (txCommitTimeoutCheckSchedule != null) {
             txCommitTimeoutCheckSchedule.cancel();
         }
 
@@ -255,24 +257,25 @@ public class Shard extends RaftActor {
                 setPeerAddress(resolved.getPeerId().toString(),
                         resolved.getPeerAddress());
             } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
+                store.checkForExpiredTransactions(transactionCommitTimeout);
                 commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
-            } else if(message instanceof DatastoreContext) {
+            } else if (message instanceof DatastoreContext) {
                 onDatastoreContext((DatastoreContext)message);
-            } else if(message instanceof RegisterRoleChangeListener){
+            } else if (message instanceof RegisterRoleChangeListener){
                 roleChangeNotifier.get().forward(message, context());
             } else if (message instanceof FollowerInitialSyncUpStatus) {
                 shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
                 context().parent().tell(message, self());
-            } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
+            } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)){
                 sender().tell(getShardMBean(), self());
-            } else if(message instanceof GetShardDataTree) {
+            } else if (message instanceof GetShardDataTree) {
                 sender().tell(store.getDataTree(), self());
-            } else if(message instanceof ServerRemoved){
+            } else if (message instanceof ServerRemoved){
                 context().parent().forward(message, context());
-            } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
+            } else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
                 messageRetrySupport.onTimerMessage(message);
             } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
-                commitCoordinator.processCohortRegistryCommand(getSender(),
+                store.processCohortRegistryCommand(getSender(),
                         (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
             } else {
                 super.handleNonRaftCommand(message);
@@ -285,7 +288,7 @@ public class Shard extends RaftActor {
     }
 
     public int getPendingTxCommitQueueSize() {
-        return commitCoordinator.getQueueSize();
+        return store.getQueueSize();
     }
 
     public int getCohortCacheSize() {
@@ -298,61 +301,40 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
+    protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) {
         return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
                 : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
 
-    protected void onDatastoreContext(DatastoreContext context) {
+    protected void onDatastoreContext(final DatastoreContext context) {
         datastoreContext = context;
 
-        commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
-
         setTransactionCommitTimeout();
 
-        if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
+        if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
             setPersistence(true);
-        } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
+        } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
             setPersistence(false);
         }
 
         updateConfigParams(datastoreContext.getShardRaftConfig());
     }
 
-    private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
-        return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
+    boolean canSkipPayload() {
+        // If we do not have any followers and we are not using persistence we can apply modification to the state
+        // immediately
+        return !hasFollowers() && !persistence().isRecoveryApplicable();
     }
 
-    void continueCommit(final CohortEntry cohortEntry) {
-        final DataTreeCandidate candidate = cohortEntry.getCandidate();
-        final TransactionIdentifier transactionId = cohortEntry.getTransactionID();
-
-        // If we do not have any followers and we are not using persistence
-        // or if cohortEntry has no modifications
-        // we can apply modification to the state immediately
-        if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
-            applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate);
-            return;
-        }
-
-        final Payload payload;
-        try {
-            payload = CommitTransactionPayload.create(transactionId, candidate);
-        } catch (IOException e) {
-            LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate,
-                e);
-            // TODO: do we need to do something smarter here?
-            throw Throwables.propagate(e);
-        }
-
-        persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload);
+    // applyState() will be invoked once consensus is reached on the payload
+    void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
+        // We are faking the sender
+        persistData(self(), transactionId, payload);
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
         if (isLeader()) {
-            if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
-                shardMBean.incrementFailedTransactionsCount();
-            }
+            commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -365,86 +347,6 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID,
-            @Nonnull final CohortEntry cohortEntry) {
-        LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
-
-        try {
-            try {
-            cohortEntry.commit();
-            } catch(ExecutionException e) {
-                // We may get a "store tree and candidate base differ" IllegalStateException from commit under
-                // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
-                // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
-                // applying it to the state. We then become the leader and a second tx is pre-committed and
-                // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
-                // candidate via applyState prior to the second tx. Since the second tx has already been
-                // pre-committed, when it gets here to commit it will get an IllegalStateException.
-
-                // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
-                // solution will be forthcoming.
-                if(e.getCause() instanceof IllegalStateException) {
-                    LOG.debug("{}: commit failed for transaction {} - retrying as foreign candidate", persistenceId(),
-                            transactionID, e);
-                    store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
-                } else {
-                    throw e;
-                }
-            }
-
-            sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf());
-
-            shardMBean.incrementCommittedTransactionCount();
-            shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-
-        } catch (Exception e) {
-            sender.tell(new akka.actor.Status.Failure(e), getSelf());
-
-            LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
-                    transactionID, e);
-            shardMBean.incrementFailedTransactionsCount();
-        } finally {
-            commitCoordinator.currentTransactionComplete(transactionID, true);
-        }
-    }
-
-    private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) {
-        // With persistence enabled, this method is called via applyState by the leader strategy
-        // after the commit has been replicated to a majority of the followers.
-
-        CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
-        if (cohortEntry == null) {
-            // The transaction is no longer the current commit. This can happen if the transaction
-            // was aborted prior, most likely due to timeout in the front-end. We need to finish
-            // committing the transaction though since it was successfully persisted and replicated
-            // however we can't use the original cohort b/c it was already preCommitted and may
-            // conflict with the current commit or may have been aborted so we commit with a new
-            // transaction.
-            cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
-            if(cohortEntry != null) {
-                try {
-                    store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
-                } catch (DataValidationFailedException e) {
-                    shardMBean.incrementFailedTransactionsCount();
-                    LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
-                }
-
-                sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
-                        getSelf());
-            } else {
-                // This really shouldn't happen - it likely means that persistence or replication
-                // took so long to complete such that the cohort entry was expired from the cache.
-                IllegalStateException ex = new IllegalStateException(
-                        String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
-                                persistenceId(), transactionID));
-                LOG.error(ex.getMessage());
-                sender.tell(new akka.actor.Status.Failure(ex), getSelf());
-            }
-        } else {
-            finishCommit(sender, transactionID, cohortEntry);
-        }
-    }
-
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
 
@@ -462,7 +364,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
+    protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
         try {
             commitCoordinator.handleBatchedModifications(batched, sender, this);
         } catch (Exception e) {
@@ -472,7 +374,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleBatchedModifications(BatchedModifications batched) {
+    private void handleBatchedModifications(final BatchedModifications batched) {
         // This message is sent to prepare the modifications transaction directly on the Shard as an
         // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
         // BatchedModifications message, the caller sets the ready flag in the message indicating
@@ -504,15 +406,15 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
                         newModifications.size(), leader);
 
-                for(BatchedModifications bm: newModifications) {
+                for (BatchedModifications bm : newModifications) {
                     leader.forward(bm, getContext());
                 }
             }
         }
     }
 
-    private boolean failIfIsolatedLeader(ActorRef sender) {
-        if(isIsolatedLeader()) {
+    private boolean failIfIsolatedLeader(final ActorRef sender) {
+        if (isIsolatedLeader()) {
             sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
                     "Shard %s was the leader but has lost contact with all of its followers. Either all" +
                     " other follower nodes are down or this node is isolated by a network partition.",
@@ -552,13 +454,12 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
+    private void handleForwardedReadyTransaction(final ForwardedReadyTransaction forwardedReady) {
         LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID());
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
-            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
-                    store.getSchemaContext());
+            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
@@ -598,9 +499,9 @@ public class Shard extends RaftActor {
         store.closeTransactionChain(closeTransactionChain.getIdentifier());
     }
 
-    private void createTransaction(CreateTransaction createTransaction) {
+    private void createTransaction(final CreateTransaction createTransaction) {
         try {
-            if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
+            if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
                     failIfIsolatedLeader(getSender())) {
                 return;
             }
@@ -615,25 +516,12 @@ public class Shard extends RaftActor {
         }
     }
 
-    private ActorRef createTransaction(int transactionType, TransactionIdentifier transactionId) {
+    private ActorRef createTransaction(final int transactionType, final TransactionIdentifier transactionId) {
         LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
         return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
             transactionId);
     }
 
-    private void commitWithNewTransaction(final BatchedModifications modification) {
-        ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID());
-        modification.apply(tx.getSnapshot());
-        try {
-            snapshotCohort.syncCommitTransaction(tx);
-            shardMBean.incrementCommittedTransactionCount();
-            shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-        } catch (Exception e) {
-            shardMBean.incrementFailedTransactionsCount();
-            LOG.error("{}: Failed to commit", persistenceId(), e);
-        }
-    }
-
     private void updateSchemaContext(final UpdateSchemaContext message) {
         updateSchemaContext(message.getSchemaContext());
     }
@@ -669,7 +557,7 @@ public class Shard extends RaftActor {
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
         // Being paranoid here - this method should only be called once but just in case...
-        if(txCommitTimeoutCheckSchedule == null) {
+        if (txCommitTimeoutCheckSchedule == null) {
             // Schedule a message to be periodically sent to check if the current in-progress
             // transaction should be expired and aborted.
             FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
@@ -685,13 +573,13 @@ public class Shard extends RaftActor {
             if (clientActor == null) {
                 // No clientActor indicates a replica coming from the leader
                 try {
-                    store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue());
+                    store.applyStateFromLeader(identifier, (DataTreeCandidateSupplier)data);
                 } catch (DataValidationFailedException | IOException e) {
                     LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
                 }
             } else {
                 // Replication consensus reached, proceed to commit
-                finishCommit(clientActor, identifier);
+                store.payloadReplicationComplete(identifier, (DataTreeCandidateSupplier)data);
             }
         } else {
             LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
@@ -699,25 +587,6 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) {
-        if(modification == null) {
-            LOG.error(
-                    "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                    persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
-        } else if(clientActor == null) {
-            // There's no clientActor to which to send a commit reply so we must be applying
-            // replicated state from the leader.
-
-            // The only implementation we know of is BatchedModifications, which also carries a transaction
-            // identifier -- which we really need that.
-            Preconditions.checkArgument(modification instanceof BatchedModifications);
-            commitWithNewTransaction((BatchedModifications)modification);
-        } else {
-            // This must be the OK to commit after replication consensus.
-            finishCommit(clientActor, identifier);
-        }
-    }
-
     @Override
     protected void onStateChanged() {
         boolean isLeader = isLeader();
@@ -727,7 +596,7 @@ public class Shard extends RaftActor {
 
         // If this actor is no longer the leader close all the transaction chains
         if (!isLeader) {
-            if(LOG.isDebugEnabled()) {
+            if (LOG.isDebugEnabled()) {
                 LOG.debug(
                     "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
                     persistenceId(), getId());
@@ -736,29 +605,29 @@ public class Shard extends RaftActor {
             store.closeAllTransactionChains();
         }
 
-        if(hasLeader && !isIsolatedLeader()) {
+        if (hasLeader && !isIsolatedLeader()) {
             messageRetrySupport.retryMessages();
         }
     }
 
     @Override
-    protected void onLeaderChanged(String oldLeader, String newLeader) {
+    protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
 
         boolean hasLeader = hasLeader();
-        if(hasLeader && !isLeader()) {
+        if (hasLeader && !isLeader()) {
             // Another leader was elected. If we were the previous leader and had pending transactions, convert
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
-            if(leader != null) {
-                Collection<Object> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
-                        datastoreContext.getShardBatchedModificationCount());
+            if (leader != null) {
+                Collection<?> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
+                            datastoreContext.getShardBatchedModificationCount());
 
-                if(!messagesToForward.isEmpty()) {
+                if (!messagesToForward.isEmpty()) {
                     LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
                             messagesToForward.size(), leader);
 
-                    for(Object message: messagesToForward) {
+                    for (Object message : messagesToForward) {
                         leader.tell(message, self());
                     }
                 }
@@ -769,15 +638,15 @@ public class Shard extends RaftActor {
             }
         }
 
-        if(hasLeader && !isIsolatedLeader()) {
+        if (hasLeader && !isIsolatedLeader()) {
             messageRetrySupport.retryMessages();
         }
     }
 
     @Override
-    protected void pauseLeader(Runnable operation) {
+    protected void pauseLeader(final Runnable operation) {
         LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
-        commitCoordinator.setRunOnPendingTransactionsComplete(operation);
+        store.setRunOnPendingTransactionsComplete(operation);
     }
 
     @Override
@@ -815,9 +684,10 @@ public class Shard extends RaftActor {
         private DatastoreContext datastoreContext;
         private SchemaContext schemaContext;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
+        private TipProducingDataTree dataTree;
         private volatile boolean sealed;
 
-        protected AbstractBuilder(Class<S> shardClass) {
+        protected AbstractBuilder(final Class<S> shardClass) {
             this.shardClass = shardClass;
         }
 
@@ -830,36 +700,42 @@ public class Shard extends RaftActor {
             return (T) this;
         }
 
-        public T id(ShardIdentifier id) {
+        public T id(final ShardIdentifier id) {
             checkSealed();
             this.id = id;
             return self();
         }
 
-        public T peerAddresses(Map<String, String> peerAddresses) {
+        public T peerAddresses(final Map<String, String> peerAddresses) {
             checkSealed();
             this.peerAddresses = peerAddresses;
             return self();
         }
 
-        public T datastoreContext(DatastoreContext datastoreContext) {
+        public T datastoreContext(final DatastoreContext datastoreContext) {
             checkSealed();
             this.datastoreContext = datastoreContext;
             return self();
         }
 
-        public T schemaContext(SchemaContext schemaContext) {
+        public T schemaContext(final SchemaContext schemaContext) {
             checkSealed();
             this.schemaContext = schemaContext;
             return self();
         }
 
-        public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+        public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
             checkSealed();
             this.restoreFromSnapshot = restoreFromSnapshot;
             return self();
         }
 
+        public T dataTree(final TipProducingDataTree dataTree) {
+            checkSealed();
+            this.dataTree = dataTree;
+            return self();
+        }
+
         public ShardIdentifier getId() {
             return id;
         }
@@ -880,6 +756,10 @@ public class Shard extends RaftActor {
             return restoreFromSnapshot;
         }
 
+        public TipProducingDataTree getDataTree() {
+            return dataTree;
+        }
+
         public TreeType getTreeType() {
             switch (datastoreContext.getLogicalStoreType()) {
             case CONFIGURATION:
@@ -910,4 +790,8 @@ public class Shard extends RaftActor {
             super(Shard.class);
         }
     }
+
+    Ticker ticker() {
+        return Ticker.systemTicker();
+    }
 }
index eb0c04d..b3feadc 100644 (file)
@@ -12,30 +12,33 @@ import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 
 /**
@@ -45,89 +48,48 @@ import org.slf4j.Logger;
  */
 final class ShardCommitCoordinator {
 
-    // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+    // Interface hook for unit tests to replace or decorate the ShardDataTreeCohorts.
+    @VisibleForTesting
     public interface CohortDecorator {
         ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
     }
 
     private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
 
-    private CohortEntry currentCohortEntry;
-
     private final ShardDataTree dataTree;
 
-    private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
-
-    // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
-    // since this should only be accessed on the shard's dispatcher.
-    private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
-
-    private int queueCapacity;
-
     private final Logger log;
 
     private final String name;
 
-    private final long cacheExpiryTimeoutInMillis;
-
-    // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+    // This is a hook for unit tests to replace or decorate the ShardDataTreeCohorts.
+    @VisibleForTesting
     private CohortDecorator cohortDecorator;
 
     private ReadyTransactionReply readyTransactionReply;
 
-    private Runnable runOnPendingTransactionsComplete;
-
-    ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
-            String name) {
-
-        this.queueCapacity = queueCapacity;
+    ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) {
         this.log = log;
         this.name = name;
         this.dataTree = Preconditions.checkNotNull(dataTree);
-        this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
-    }
-
-    int getQueueSize() {
-        return queuedCohortEntries.size();
     }
 
     int getCohortCacheSize() {
         return cohortCache.size();
     }
 
-    void setQueueCapacity(int queueCapacity) {
-        this.queueCapacity = queueCapacity;
+    private String persistenceId() {
+        return dataTree.logContext();
     }
 
-    private ReadyTransactionReply readyTransactionReply(Shard shard) {
-        if(readyTransactionReply == null) {
-            readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
+    private ReadyTransactionReply readyTransactionReply(final ActorRef cohort) {
+        if (readyTransactionReply == null) {
+            readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(cohort));
         }
 
         return readyTransactionReply;
     }
 
-    private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
-        if(queuedCohortEntries.size() < queueCapacity) {
-            queuedCohortEntries.offer(cohortEntry);
-
-            log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
-                    queuedCohortEntries.size());
-
-            return true;
-        } else {
-            cohortCache.remove(cohortEntry.getTransactionID());
-
-            final RuntimeException ex = new RuntimeException(
-                    String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
-                                  " capacity %d has been reached.",
-                                  name, cohortEntry.getTransactionID(), queueCapacity));
-            log.error(ex.getMessage());
-            sender.tell(new Failure(ex), shard.self());
-            return false;
-        }
-    }
-
     /**
      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
@@ -137,21 +99,16 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      * @param schema
      */
-    void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
-            SchemaContext schema) {
+    void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender,
+            final Shard shard) {
         log.debug("{}: Readying transaction {}, client version {}", name,
                 ready.getTransactionID(), ready.getTxnClientVersion());
 
         final ShardDataTreeCohort cohort = ready.getTransaction().ready();
-        final CohortEntry cohortEntry = CohortEntry.createReady(ready.getTransactionID(), cohort, cohortRegistry,
-            schema, ready.getTxnClientVersion());
+        final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
         cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
 
-        if(!queueCohortEntry(cohortEntry, sender, shard)) {
-            return;
-        }
-
-        if(ready.isDoImmediateCommit()) {
+        if (ready.isDoImmediateCommit()) {
             cohortEntry.setDoImmediateCommit(true);
             cohortEntry.setReplySender(sender);
             cohortEntry.setShard(shard);
@@ -159,7 +116,7 @@ final class ShardCommitCoordinator {
         } else {
             // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
             // front-end so send back a ReadyTransactionReply with our actor path.
-            sender.tell(readyTransactionReply(shard), shard.self());
+            sender.tell(readyTransactionReply(shard.self()), shard.self());
         }
     }
 
@@ -172,52 +129,48 @@ final class ShardCommitCoordinator {
      * @param batched the BatchedModifications message to process
      * @param sender the sender of the message
      */
-    void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
+    void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
-        if(cohortEntry == null) {
-            cohortEntry = CohortEntry.createOpen(batched.getTransactionID(),
-                    dataTree.newReadWriteTransaction(batched.getTransactionID()),
-                    cohortRegistry, dataTree.getSchemaContext(), batched.getVersion());
+        if (cohortEntry == null) {
+            cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionID()),
+                batched.getVersion());
             cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         }
 
-        if(log.isDebugEnabled()) {
+        if (log.isDebugEnabled()) {
             log.debug("{}: Applying {} batched modifications for Tx {}", name,
                     batched.getModifications().size(), batched.getTransactionID());
         }
 
         cohortEntry.applyModifications(batched.getModifications());
 
-        if(batched.isReady()) {
-            if(cohortEntry.getLastBatchedModificationsException() != null) {
+        if (batched.isReady()) {
+            if (cohortEntry.getLastBatchedModificationsException() != null) {
                 cohortCache.remove(cohortEntry.getTransactionID());
                 throw cohortEntry.getLastBatchedModificationsException();
             }
 
-            if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+            if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
                 cohortCache.remove(cohortEntry.getTransactionID());
                 throw new IllegalStateException(String.format(
                         "The total number of batched messages received %d does not match the number sent %d",
                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
             }
 
-            if(!queueCohortEntry(cohortEntry, sender, shard)) {
-                return;
-            }
-
-            if(log.isDebugEnabled()) {
+            if (log.isDebugEnabled()) {
                 log.debug("{}: Readying Tx {}, client version {}", name,
                         batched.getTransactionID(), batched.getVersion());
             }
 
-            cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
+            cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
+            cohortEntry.ready(cohortDecorator);
 
-            if(batched.isDoCommitOnReady()) {
+            if (batched.isDoCommitOnReady()) {
                 cohortEntry.setReplySender(sender);
                 cohortEntry.setShard(shard);
                 handleCanCommit(cohortEntry);
             } else {
-                sender.tell(readyTransactionReply(shard), shard.self());
+                sender.tell(readyTransactionReply(shard.self()), shard.self());
             }
         } else {
             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
@@ -233,18 +186,13 @@ final class ShardCommitCoordinator {
      * @param sender the sender of the message
      * @param shard the transaction's shard actor
      */
-    void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
-        final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
-                message.getTransactionID());
-        final CohortEntry cohortEntry = CohortEntry.createReady(message.getTransactionID(), cohort, cohortRegistry,
-            dataTree.getSchemaContext(), DataStoreVersions.CURRENT_VERSION);
+    void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
+        final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionID(),
+            message.getModification());
+        final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
-        if(!queueCohortEntry(cohortEntry, sender, shard)) {
-            return;
-        }
-
         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
 
         if (message.isDoCommitOnReady()) {
@@ -252,14 +200,14 @@ final class ShardCommitCoordinator {
             cohortEntry.setShard(shard);
             handleCanCommit(cohortEntry);
         } else {
-            sender.tell(readyTransactionReply(shard), shard.self());
+            sender.tell(readyTransactionReply(shard.self()), shard.self());
         }
     }
 
     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
             final int maxModificationsPerBatch) {
-        CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
-        if(cohortEntry == null || cohortEntry.getTransaction() == null) {
+        CohortEntry cohortEntry = cohortCache.remove(from.getTransactionID());
+        if (cohortEntry == null || cohortEntry.getTransaction() == null) {
             return Collections.singletonList(from);
         }
 
@@ -269,7 +217,7 @@ final class ShardCommitCoordinator {
         cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
             @Override
             protected BatchedModifications getModifications() {
-                if(newModifications.isEmpty() ||
+                if (newModifications.isEmpty() ||
                         newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
                     newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
                 }
@@ -285,34 +233,30 @@ final class ShardCommitCoordinator {
         return newModifications;
     }
 
-    private void handleCanCommit(CohortEntry cohortEntry) {
-        cohortEntry.updateLastAccessTime();
-
-        if(currentCohortEntry != null) {
-            // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
-            // queue and will get processed after all prior entries complete.
+    private void handleCanCommit(final CohortEntry cohortEntry) {
+        cohortEntry.canCommit(new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionID());
 
-            if(log.isDebugEnabled()) {
-                log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
-                        name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
+                if (cohortEntry.isDoImmediateCommit()) {
+                    doCommit(cohortEntry);
+                } else {
+                    cohortEntry.getReplySender().tell(
+                        CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable(),
+                        cohortEntry.getShard().self());
+                }
             }
 
-            return;
-        }
+            @Override
+            public void onFailure(final Throwable t) {
+                log.debug("{}: An exception occurred during canCommit for {}: {}", name,
+                        cohortEntry.getTransactionID(), t);
 
-        // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
-        // it the current entry and proceed with canCommit.
-        // Purposely checking reference equality here.
-        if(queuedCohortEntries.peek() == cohortEntry) {
-            currentCohortEntry = queuedCohortEntries.poll();
-            doCanCommit(currentCohortEntry);
-        } else {
-            if(log.isDebugEnabled()) {
-                log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
-                        queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
-                                cohortEntry.getTransactionID());
+                cohortCache.remove(cohortEntry.getTransactionID());
+                cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self());
             }
-        }
+        });
     }
 
     /**
@@ -322,15 +266,15 @@ final class ShardCommitCoordinator {
      * @param sender the actor to which to send the response
      * @param shard the transaction's shard actor
      */
-    void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
+    void handleCanCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Lookup the cohort entry that was cached previously (or should have been) by
         // transactionReady (via the ForwardedReadyTransaction message).
         final CohortEntry cohortEntry = cohortCache.get(transactionID);
-        if(cohortEntry == null) {
-            // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
-            // between canCommit and ready and the entry was expired from the cache.
+        if (cohortEntry == null) {
+            // Either canCommit was invoked before ready (shouldn't happen) or a long time passed
+            // between canCommit and ready and the entry was expired from the cache or it was aborted.
             IllegalStateException ex = new IllegalStateException(
-                    String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+                    String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
             log.error(ex.getMessage());
             sender.tell(new Failure(ex), shard.self());
             return;
@@ -342,70 +286,54 @@ final class ShardCommitCoordinator {
         handleCanCommit(cohortEntry);
     }
 
-    private void doCanCommit(final CohortEntry cohortEntry) {
-        boolean canCommit = false;
-        try {
-            canCommit = cohortEntry.canCommit();
-
-            log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
-
-            if(cohortEntry.isDoImmediateCommit()) {
-                if(canCommit) {
-                    doCommit(cohortEntry);
-                } else {
-                    cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
-                                "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
-                }
-            } else {
-                cohortEntry.getReplySender().tell(
-                        canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
-                            CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
-                        cohortEntry.getShard().self());
-            }
-        } catch (Exception e) {
-            log.debug("{}: An exception occurred during canCommit", name, e);
-
-            Throwable failure = e;
-            if(e instanceof ExecutionException) {
-                failure = e.getCause();
-            }
-
-            cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
-        } finally {
-            if(!canCommit) {
-                // Remove the entry from the cache now.
-                currentTransactionComplete(cohortEntry.getTransactionID(), true);
-            }
-        }
-    }
-
-    private boolean doCommit(CohortEntry cohortEntry) {
+    private void doCommit(final CohortEntry cohortEntry) {
         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
 
-        boolean success = false;
-
         // We perform the preCommit phase here atomically with the commit phase. This is an
         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
         // coordination of preCommit across shards in case of failure but preCommit should not
         // normally fail since we ensure only one concurrent 3-phase commit.
+        cohortEntry.preCommit(new FutureCallback<DataTreeCandidate>() {
+            @Override
+            public void onSuccess(final DataTreeCandidate candidate) {
+                finishCommit(cohortEntry.getReplySender(), cohortEntry);
+            }
 
-        try {
-            cohortEntry.preCommit();
+            @Override
+            public void onFailure(final Throwable t) {
+                log.error("{} An exception occurred while preCommitting transaction {}", name,
+                        cohortEntry.getTransactionID(), t);
+
+                cohortCache.remove(cohortEntry.getTransactionID());
+                cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self());
+            }
+        });
+    }
 
-            cohortEntry.getShard().continueCommit(cohortEntry);
+    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+        log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
-            cohortEntry.updateLastAccessTime();
+        cohortEntry.commit(new FutureCallback<UnsignedLong>() {
+            @Override
+            public void onSuccess(final UnsignedLong result) {
+                final TransactionIdentifier txId = cohortEntry.getTransactionID();
+                log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
+                    sender);
 
-            success = true;
-        } catch (Exception e) {
-            log.error("{} An exception occurred while preCommitting transaction {}",
-                    name, cohortEntry.getTransactionID(), e);
-            cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
+                cohortCache.remove(cohortEntry.getTransactionID());
+                sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
+                    cohortEntry.getShard().self());
+            }
 
-            currentTransactionComplete(cohortEntry.getTransactionID(), true);
-        }
+            @Override
+            public void onFailure(final Throwable t) {
+                log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+                        cohortEntry.getTransactionID(), t);
 
-        return success;
+                cohortCache.remove(cohortEntry.getTransactionID());
+                sender.tell(new Failure(t), cohortEntry.getShard().self());
+            }
+        });
     }
 
     /**
@@ -414,39 +342,26 @@ final class ShardCommitCoordinator {
      * @param transactionID the ID of the transaction to commit
      * @param sender the actor to which to send the response
      * @param shard the transaction's shard actor
-     * @return true if the transaction was successfully prepared, false otherwise.
      */
-    boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
-        // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
-        // this transaction.
-        final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
-        if(cohortEntry == null) {
-            // We're not the current Tx - the Tx was likely expired b/c it took too long in
-            // between the canCommit and commit messages.
+    void handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
+        final CohortEntry cohortEntry = cohortCache.get(transactionID);
+        if (cohortEntry == null) {
+            // Either a long time passed between canCommit and commit and the entry was expired from the cache
+            // or it was aborted.
             IllegalStateException ex = new IllegalStateException(
-                    String.format("%s: Cannot commit transaction %s - it is not the current transaction",
-                            name, transactionID));
+                    String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
             log.error(ex.getMessage());
             sender.tell(new Failure(ex), shard.self());
-            return false;
+            return;
         }
 
         cohortEntry.setReplySender(sender);
-        return doCommit(cohortEntry);
+        doCommit(cohortEntry);
     }
 
     void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
-        CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
-        if(cohortEntry != null) {
-            // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
-            // aborted during replication in which case we may still commit locally if replication
-            // succeeds.
-            currentTransactionComplete(transactionID, false);
-        } else {
-            cohortEntry = getAndRemoveCohortEntry(transactionID);
-        }
-
-        if(cohortEntry == null) {
+        CohortEntry cohortEntry = cohortCache.remove(transactionID);
+        if (cohortEntry == null) {
             return;
         }
 
@@ -458,223 +373,107 @@ final class ShardCommitCoordinator {
 
             shard.getShardMBean().incrementAbortTransactionsCount();
 
-            if(sender != null) {
+            if (sender != null) {
                 sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
             }
         } catch (Exception e) {
             log.error("{}: An exception happened during abort", name, e);
 
-            if(sender != null) {
+            if (sender != null) {
                 sender.tell(new Failure(e), self);
             }
         }
     }
 
     void checkForExpiredTransactions(final long timeout, final Shard shard) {
-        CohortEntry cohortEntry = getCurrentCohortEntry();
-        if(cohortEntry != null) {
-            if(cohortEntry.isExpired(timeout)) {
-                log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
-                        name, cohortEntry.getTransactionID(), timeout);
-
-                handleAbort(cohortEntry.getTransactionID(), null, shard);
+        Iterator<CohortEntry> iter = cohortCache.values().iterator();
+        while (iter.hasNext()) {
+            CohortEntry cohortEntry = iter.next();
+            if(cohortEntry.isFailed()) {
+                iter.remove();
             }
         }
-
-        cleanupExpiredCohortEntries();
     }
 
     void abortPendingTransactions(final String reason, final Shard shard) {
-        if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
-            return;
-        }
-
-        List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
+        final Failure failure = new Failure(new RuntimeException(reason));
+        Collection<ShardDataTreeCohort> pending = dataTree.getAndClearPendingTransactions();
 
-        log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
+        log.debug("{}: Aborting {} pending queued transactions", name, pending.size());
 
-        for(CohortEntry cohortEntry: cohortEntries) {
-            if(cohortEntry.getReplySender() != null) {
-                cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+        for (ShardDataTreeCohort cohort : pending) {
+            CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier());
+            if (cohortEntry == null) {
+                continue;
             }
-        }
-    }
-
-    private List<CohortEntry> getAndClearPendingCohortEntries() {
-        List<CohortEntry> cohortEntries = new ArrayList<>();
-
-        if(currentCohortEntry != null) {
-            cohortEntries.add(currentCohortEntry);
-            cohortCache.remove(currentCohortEntry.getTransactionID());
-            currentCohortEntry = null;
-        }
 
-        for(CohortEntry cohortEntry: queuedCohortEntries) {
-            cohortEntries.add(cohortEntry);
-            cohortCache.remove(cohortEntry.getTransactionID());
+            if (cohortEntry.getReplySender() != null) {
+                cohortEntry.getReplySender().tell(failure, shard.self());
+            }
         }
 
-        queuedCohortEntries.clear();
-        return cohortEntries;
+        cohortCache.clear();
     }
 
-    Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
-        if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
-            return Collections.emptyList();
-        }
-
-        Collection<Object> messages = new ArrayList<>();
-        List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
-        for(CohortEntry cohortEntry: cohortEntries) {
-            if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
+    Collection<?> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
+        final Collection<VersionedExternalizableMessage> messages = new ArrayList<>();
+        for (ShardDataTreeCohort cohort : dataTree.getAndClearPendingTransactions()) {
+            CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier());
+            if (cohortEntry == null) {
                 continue;
             }
 
-            final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+            final Deque<BatchedModifications> newMessages = new ArrayDeque<>();
             cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
                 @Override
                 protected BatchedModifications getModifications() {
-                    if(newModifications.isEmpty() ||
-                            newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
-                        newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
-                                cohortEntry.getClientVersion()));
-        }
+                    final BatchedModifications lastBatch = newMessages.peekLast();
 
-                    return newModifications.getLast();
+                    if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) {
+                        return lastBatch;
+                    }
+
+                    // Allocate a new message
+                    final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionID(),
+                        cohortEntry.getClientVersion());
+                    newMessages.add(ret);
+                    return ret;
                 }
             });
 
-            if(!newModifications.isEmpty()) {
-                BatchedModifications last = newModifications.getLast();
-                last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
+            final BatchedModifications last = newMessages.peekLast();
+            if (last != null) {
+                final boolean immediate = cohortEntry.isDoImmediateCommit();
+                last.setDoCommitOnReady(immediate);
                 last.setReady(true);
-                last.setTotalMessagesSent(newModifications.size());
-                messages.addAll(newModifications);
-
-                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
-                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
-                            cohortEntry.getClientVersion()));
-                }
+                last.setTotalMessagesSent(newMessages.size());
 
-                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
-                    messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
-                            cohortEntry.getClientVersion()));
-                }
-            }
-        }
+                messages.addAll(newMessages);
 
-        return messages;
-    }
-
-    /**
-     * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
-     * matches the current entry.
-     *
-     * @param transactionID the ID of the transaction
-     * @return the current CohortEntry or null if the given transaction ID does not match the
-     *         current entry.
-     */
-    CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
-        if(isCurrentTransaction(transactionID)) {
-            return currentCohortEntry;
-        }
-
-        return null;
-    }
-
-    CohortEntry getCurrentCohortEntry() {
-        return currentCohortEntry;
-    }
-
-    CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
-        return cohortCache.remove(transactionID);
-    }
-
-    boolean isCurrentTransaction(Identifier transactionID) {
-        return currentCohortEntry != null &&
-                currentCohortEntry.getTransactionID().equals(transactionID);
-    }
-
-    /**
-     * This method is called when a transaction is complete, successful or not. If the given
-     * given transaction ID matches the current in-progress transaction, the next cohort entry,
-     * if any, is dequeued and processed.
-     *
-     * @param transactionID the ID of the completed transaction
-     * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
-     *        the cache.
-     */
-    void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
-        if(removeCohortEntry) {
-            cohortCache.remove(transactionID);
-        }
-
-        if(isCurrentTransaction(transactionID)) {
-            currentCohortEntry = null;
-
-            log.debug("{}: currentTransactionComplete: {}", name, transactionID);
-
-            maybeProcessNextCohortEntry();
-        }
-    }
-
-    private void maybeProcessNextCohortEntry() {
-        // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
-        // clean out expired entries.
-        final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
-        while(iter.hasNext()) {
-            final CohortEntry next = iter.next();
-            if(next.isReadyToCommit()) {
-                if(currentCohortEntry == null) {
-                    if(log.isDebugEnabled()) {
-                        log.debug("{}: Next entry to canCommit {}", name, next);
+                if (!immediate) {
+                    switch (cohort.getState()) {
+                        case CAN_COMMIT_COMPLETE:
+                        case CAN_COMMIT_PENDING:
+                            messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+                                cohortEntry.getClientVersion()));
+                            break;
+                        case PRE_COMMIT_COMPLETE:
+                        case PRE_COMMIT_PENDING:
+                            messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+                                cohortEntry.getClientVersion()));
+                            break;
+                        default:
+                            break;
                     }
-
-                    iter.remove();
-                    currentCohortEntry = next;
-                    currentCohortEntry.updateLastAccessTime();
-                    doCanCommit(currentCohortEntry);
                 }
-
-                break;
-            } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
-                log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
-                        name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
-            } else if(!next.isAborted()) {
-                break;
             }
-
-            iter.remove();
-            cohortCache.remove(next.getTransactionID());
         }
 
-        maybeRunOperationOnPendingTransactionsComplete();
-    }
-
-    void cleanupExpiredCohortEntries() {
-        maybeProcessNextCohortEntry();
-    }
-
-    void setRunOnPendingTransactionsComplete(Runnable operation) {
-        runOnPendingTransactionsComplete = operation;
-        maybeRunOperationOnPendingTransactionsComplete();
-    }
-
-    private void maybeRunOperationOnPendingTransactionsComplete() {
-        if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
-            log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
-
-            runOnPendingTransactionsComplete.run();
-            runOnPendingTransactionsComplete = null;
-        }
+        return messages;
     }
 
     @VisibleForTesting
-    void setCohortDecorator(CohortDecorator cohortDecorator) {
+    void setCohortDecorator(final CohortDecorator cohortDecorator) {
         this.cohortDecorator = cohortDecorator;
     }
-
-   void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
-        cohortRegistry.process(sender, message);
-    }
 }
index 3296280..89fa8fb 100644 (file)
@@ -7,38 +7,61 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.primitives.UnsignedLong;
+import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 
 /**
  * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
@@ -49,31 +72,60 @@ import org.slf4j.LoggerFactory;
  */
 @NotThreadSafe
 public class ShardDataTree extends ShardDataTreeTransactionParent {
+    private static final class CommitEntry {
+        final SimpleShardDataTreeCohort cohort;
+        long lastAccess;
+
+        CommitEntry(final SimpleShardDataTreeCohort cohort, final long now) {
+            this.cohort = Preconditions.checkNotNull(cohort);
+            lastAccess = now;
+        }
+    }
+
+    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
+    private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
     private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
     private final TipProducingDataTree dataTree;
     private final String logContext;
+    private final Shard shard;
+    private Runnable runOnPendingTransactionsComplete;
+
     private SchemaContext schemaContext;
 
-    public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType,
+    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
-        dataTree = InMemoryDataTreeFactory.getInstance().create(treeType);
+        this.dataTree = dataTree;
         updateSchemaContext(schemaContext);
 
+        this.shard = Preconditions.checkNotNull(shard);
         this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
         this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
         this.logContext = Preconditions.checkNotNull(logContext);
     }
 
-    public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType) {
-        this(schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
+    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+            final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
+            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
+        this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType),
+                treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
+    }
+
+    @VisibleForTesting
+    public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
+        this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
                 new DefaultShardDataChangeListenerPublisher(), "");
     }
 
+    String logContext() {
+        return logContext;
+    }
+
     public TipProducingDataTree getDataTree() {
         return dataTree;
     }
@@ -92,6 +144,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException {
+        // FIXME: purge any outstanding transactions
+
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
 
@@ -191,6 +245,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return new SimpleEntry<>(reg, readCurrentData());
     }
 
+    int getQueueSize() {
+        return pendingTransactions.size();
+    }
+
     void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
@@ -214,7 +272,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
-        return new SimpleShardDataTreeCohort(this, snapshot, transaction.getId());
+
+        return createReadyCohort(transaction.getId(), snapshot);
     }
 
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
@@ -238,4 +297,320 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         dataTree.commit(candidate);
         return candidate;
     }
+
+    public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
+        Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
+        for(CommitEntry entry: pendingTransactions) {
+            ret.add(entry.cohort);
+        }
+
+        pendingTransactions.clear();
+        return ret;
+    }
+
+    private void processNextTransaction() {
+        while (!pendingTransactions.isEmpty()) {
+            final CommitEntry entry = pendingTransactions.peek();
+            final SimpleShardDataTreeCohort cohort = entry.cohort;
+            final DataTreeModification modification = cohort.getDataTreeModification();
+
+            if(cohort.getState() != State.CAN_COMMIT_PENDING) {
+                break;
+            }
+
+            LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
+            Exception cause;
+            try {
+                dataTree.validate(modification);
+                LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
+                cohort.successfulCanCommit();
+                entry.lastAccess = shard.ticker().read();
+                return;
+            } catch (ConflictingModificationAppliedException e) {
+                LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
+                    e.getPath());
+                cause = new OptimisticLockFailedException("Optimistic lock failed.", e);
+            } catch (DataValidationFailedException e) {
+                LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
+                    e.getPath(), e);
+
+                // For debugging purposes, allow dumping of the modification. Coupled with the above
+                // precondition log, it should allow us to understand what went on.
+                LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
+                cause = new TransactionCommitFailedException("Data did not pass validation.", e);
+            } catch (Exception e) {
+                LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
+                cause = e;
+            }
+
+            // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one
+            pendingTransactions.poll().cohort.failedCanCommit(cause);
+        }
+
+        maybeRunOperationOnPendingTransactionsComplete();
+    }
+
+    void startCanCommit(final SimpleShardDataTreeCohort cohort) {
+        final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
+        if (!cohort.equals(current)) {
+            LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
+            return;
+        }
+
+        processNextTransaction();
+    }
+
+    private void failPreCommit(final Exception cause) {
+        shard.getShardMBean().incrementFailedTransactionsCount();
+        pendingTransactions.poll().cohort.failedPreCommit(cause);
+        processNextTransaction();
+    }
+
+    void startPreCommit(final SimpleShardDataTreeCohort cohort) {
+        final CommitEntry entry = pendingTransactions.peek();
+        Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
+
+        final SimpleShardDataTreeCohort current = entry.cohort;
+        Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+        final DataTreeCandidateTip candidate;
+        try {
+            candidate = dataTree.prepare(cohort.getDataTreeModification());
+        } catch (Exception e) {
+            failPreCommit(e);
+            return;
+        }
+
+        try {
+            cohort.userPreCommit(candidate);
+        } catch (ExecutionException | TimeoutException e) {
+            failPreCommit(e);
+            return;
+        }
+
+        entry.lastAccess = shard.ticker().read();
+        cohort.successfulPreCommit(candidate);
+    }
+
+    private void failCommit(final Exception cause) {
+        shard.getShardMBean().incrementFailedTransactionsCount();
+        pendingTransactions.poll().cohort.failedCommit(cause);
+        processNextTransaction();
+    }
+
+    private void finishCommit(final SimpleShardDataTreeCohort cohort) {
+        final TransactionIdentifier txId = cohort.getIdentifier();
+        final DataTreeCandidate candidate = cohort.getCandidate();
+
+        LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
+
+        try {
+            try {
+                dataTree.commit(candidate);
+            } catch (IllegalStateException e) {
+                // We may get a "store tree and candidate base differ" IllegalStateException from commit under
+                // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
+                // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
+                // applying it to the state. We then become the leader and a second tx is pre-committed and
+                // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
+                // candidate via applyState prior to the second tx. Since the second tx has already been
+                // pre-committed, when it gets here to commit it will get an IllegalStateException.
+
+                // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
+                // solution will be forthcoming.
+
+                LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e);
+                applyForeignCandidate(txId, candidate);
+            }
+        } catch (Exception e) {
+            failCommit(e);
+            return;
+        }
+
+        shard.getShardMBean().incrementCommittedTransactionCount();
+        shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
+
+        // FIXME: propagate journal index
+
+        pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+
+        LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+        notifyListeners(candidate);
+
+        processNextTransaction();
+    }
+
+    void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
+        final CommitEntry entry = pendingTransactions.peek();
+        Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
+
+        final SimpleShardDataTreeCohort current = entry.cohort;
+        Verify.verify(cohort.equals(current), "Attempted to commit %s while %s is pending", cohort, current);
+
+        if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
+            LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
+            finishCommit(cohort);
+            return;
+        }
+
+        final TransactionIdentifier txId = cohort.getIdentifier();
+        final Payload payload;
+        try {
+            payload = CommitTransactionPayload.create(txId, candidate);
+        } catch (IOException e) {
+            LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
+            pendingTransactions.poll().cohort.failedCommit(e);
+            return;
+        }
+
+        // Once completed, we will continue via payloadReplicationComplete
+        entry.lastAccess = shard.ticker().read();
+        shard.persistPayload(txId, payload);
+        LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
+    }
+
+    private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) {
+        final CommitEntry current = pendingTransactions.peek();
+        if (current == null) {
+            LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+            return;
+        }
+
+        if (!current.cohort.getIdentifier().equals(txId)) {
+            LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+                current.cohort.getIdentifier(), txId);
+            return;
+        }
+
+        finishCommit(current.cohort);
+    }
+
+    void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) {
+        // For now we do not care about anything else but transactions
+        Verify.verify(identifier instanceof TransactionIdentifier);
+        payloadReplicationComplete((TransactionIdentifier)identifier, payload);
+    }
+
+    void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
+        cohortRegistry.process(sender, message);
+    }
+
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
+            final DataTreeModification modification) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
+                cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+        pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read()));
+        return cohort;
+    }
+
+    void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload)
+            throws DataValidationFailedException, IOException {
+        applyForeignCandidate(identifier, payload.getCandidate().getValue());
+    }
+
+    void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
+        final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
+        final long now = shard.ticker().read();
+        final CommitEntry currentTx = pendingTransactions.peek();
+        if (currentTx != null && currentTx.lastAccess + timeout < now) {
+            LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
+                    currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
+            boolean processNext = true;
+            switch (currentTx.cohort.getState()) {
+                case CAN_COMMIT_PENDING:
+                    pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
+                    break;
+                case CAN_COMMIT_COMPLETE:
+                    pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+                    break;
+                case PRE_COMMIT_PENDING:
+                    pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException());
+                    break;
+                case PRE_COMMIT_COMPLETE:
+                    // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
+                    //        are ready we should commit the transaction, not abort it. Our current software stack does
+                    //        not allow us to do that consistently, because we persist at the time of commit, hence
+                    //        we can end up in a state where we have pre-committed a transaction, then a leader failover
+                    //        occurred ... the new leader does not see the pre-committed transaction and does not have
+                    //        a running timer. To fix this we really need two persistence events.
+                    //
+                    //        The first one, done at pre-commit time will hold the transaction payload. When consensus
+                    //        is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
+                    //        apply the state in this event.
+                    //
+                    //        The second one, done at commit (or abort) time holds only the transaction identifier and
+                    //        signals to followers that the state should (or should not) be applied.
+                    //
+                    //        In order to make the pre-commit timer working across failovers, though, we need
+                    //        a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
+                    //        restart the timer.
+                    pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+                    break;
+                case COMMIT_PENDING:
+                    LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
+                        currentTx.cohort.getIdentifier());
+                    currentTx.lastAccess = now;
+                    processNext = false;
+                    return;
+                case ABORTED:
+                case COMMITTED:
+                case FAILED:
+                case READY:
+                default:
+                    pendingTransactions.poll();
+            }
+
+            if (processNext) {
+                processNextTransaction();
+            }
+        }
+    }
+
+    void startAbort(final SimpleShardDataTreeCohort cohort) {
+        final Iterator<CommitEntry> it = pendingTransactions.iterator();
+        if (!it.hasNext()) {
+            LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
+            return;
+        }
+
+        // First entry is special, as it may already be committing
+        final CommitEntry first = it.next();
+        if (cohort.equals(first.cohort)) {
+            if (cohort.getState() != State.COMMIT_PENDING) {
+                LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+                    cohort.getIdentifier());
+                pendingTransactions.poll();
+                processNextTransaction();
+            } else {
+                LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+            }
+
+            return;
+        }
+
+        while (it.hasNext()) {
+            final CommitEntry e = it.next();
+            if (cohort.equals(e.cohort)) {
+                LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
+                it.remove();
+                return;
+            }
+        }
+
+        LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+    }
+
+    void setRunOnPendingTransactionsComplete(final Runnable operation) {
+        runOnPendingTransactionsComplete = operation;
+        maybeRunOperationOnPendingTransactionsComplete();
+    }
+
+    private void maybeRunOperationOnPendingTransactionsComplete() {
+      if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+          LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+                  runOnPendingTransactionsComplete);
+
+          runOnPendingTransactionsComplete.run();
+          runOnPendingTransactionsComplete = null;
+      }
+  }
 }
index 4787612..0a3a6ae 100644 (file)
@@ -8,11 +8,29 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
-public abstract class ShardDataTreeCohort {
+public abstract class ShardDataTreeCohort implements Identifiable<TransactionIdentifier> {
+    public enum State {
+        READY,
+        CAN_COMMIT_PENDING,
+        CAN_COMMIT_COMPLETE,
+        PRE_COMMIT_PENDING,
+        PRE_COMMIT_COMPLETE,
+        COMMIT_PENDING,
+
+        ABORTED,
+        COMMITTED,
+        FAILED,
+    }
+
     ShardDataTreeCohort() {
         // Prevent foreign instantiation
     }
@@ -20,15 +38,23 @@ public abstract class ShardDataTreeCohort {
     // FIXME: This leaks internal state generated in preCommit,
     // should be result of canCommit
     abstract DataTreeCandidateTip getCandidate();
+
     abstract DataTreeModification getDataTreeModification();
 
     // FIXME: Should return rebased DataTreeCandidateTip
     @VisibleForTesting
-    public abstract ListenableFuture<Boolean> canCommit();
+    public abstract void canCommit(FutureCallback<Void> callback);
+
     @VisibleForTesting
-    public abstract ListenableFuture<Void> preCommit();
+    public abstract void preCommit(FutureCallback<DataTreeCandidate> callback);
+
     @VisibleForTesting
     public abstract ListenableFuture<Void> abort();
+
     @VisibleForTesting
-    public abstract ListenableFuture<Void> commit();
+    public abstract void commit(FutureCallback<UnsignedLong> callback);
+
+    public abstract boolean isFailed();
+
+    public abstract State getState();
 }
index 7812d70..8bef15b 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
@@ -99,13 +98,6 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
     }
 
-    void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction)
-            throws ExecutionException, InterruptedException {
-        ShardDataTreeCohort commitCohort = store.finishTransaction(transaction);
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
-    }
-
     @Override
     public void applySnapshot(final byte[] snapshotBytes) {
         // Since this will be done only on Recovery or when this actor is a Follower
index 2842881..bb016a2 100644 (file)
@@ -7,34 +7,52 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 
-final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Identifiable<TransactionIdentifier> {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
-    private static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
     private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
     private final DataTreeModification transaction;
     private final ShardDataTree dataTree;
     private final TransactionIdentifier transactionId;
+    private final CompositeDataTreeCohort userCohorts;
+
+    private State state = State.READY;
     private DataTreeCandidateTip candidate;
+    private FutureCallback<?> callback;
+    private Exception nextFailure;
 
     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
-            final TransactionIdentifier transactionId) {
+            final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
         this.dataTree = Preconditions.checkNotNull(dataTree);
         this.transaction = Preconditions.checkNotNull(transaction);
         this.transactionId = Preconditions.checkNotNull(transactionId);
+        this.userCohorts = Preconditions.checkNotNull(userCohorts);
+    }
+
+    @Override
+    public TransactionIdentifier getIdentifier() {
+        return transactionId;
     }
 
     @Override
@@ -43,80 +61,162 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     }
 
     @Override
-    public ListenableFuture<Boolean> canCommit() {
-        DataTreeModification modification = getDataTreeModification();
-        try {
-            dataTree.getDataTree().validate(modification);
-            LOG.trace("Transaction {} validated", transaction);
-            return TRUE_FUTURE;
-        }
-        catch (ConflictingModificationAppliedException e) {
-            LOG.warn("Store Tx {}: Conflicting modification for path {}.", transactionId, e.getPath());
-            return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
-        } catch (DataValidationFailedException e) {
-            LOG.warn("Store Tx {}: Data validation failed for path {}.", transactionId, e.getPath(), e);
-
-            // For debugging purposes, allow dumping of the modification. Coupled with the above
-            // precondition log, it should allow us to understand what went on.
-            LOG.debug("Store Tx {}: modifications: {} tree: {}", transactionId, modification, dataTree.getDataTree());
-
-            return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
-        } catch (Exception e) {
-            LOG.warn("Unexpected failure in validation phase", e);
-            return Futures.immediateFailedFuture(e);
+    public DataTreeModification getDataTreeModification() {
+        DataTreeModification dataTreeModification = transaction;
+        if (transaction instanceof PruningDataTreeModification){
+            dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification();
         }
+        return dataTreeModification;
+    }
+
+    private void checkState(State expected) {
+        Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
     }
 
     @Override
-    public ListenableFuture<Void> preCommit() {
-        try {
-            candidate = dataTree.getDataTree().prepare(getDataTreeModification());
-            /*
-             * FIXME: this is the place where we should be interacting with persistence, specifically by invoking
-             *        persist on the candidate (which gives us a Future).
-             */
-            LOG.trace("Transaction {} prepared candidate {}", transaction, candidate);
-            return VOID_FUTURE;
-        } catch (Exception e) {
-            if(LOG.isTraceEnabled()) {
-                LOG.trace("Transaction {} failed to prepare", transaction, e);
-            } else {
-                LOG.error("Transaction failed to prepare", e);
-            }
-            return Futures.immediateFailedFuture(e);
+    public void canCommit(final FutureCallback<Void> callback) {
+        if(state == State.CAN_COMMIT_PENDING) {
+            return;
         }
+
+        checkState(State.READY);
+        this.callback = Preconditions.checkNotNull(callback);
+        state = State.CAN_COMMIT_PENDING;
+        dataTree.startCanCommit(this);
     }
 
     @Override
-    DataTreeModification getDataTreeModification() {
-        DataTreeModification dataTreeModification = transaction;
-        if(transaction instanceof PruningDataTreeModification){
-            dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification();
+    public void preCommit(final FutureCallback<DataTreeCandidate> callback) {
+        checkState(State.CAN_COMMIT_COMPLETE);
+        this.callback = Preconditions.checkNotNull(callback);
+        state = State.PRE_COMMIT_PENDING;
+
+        if (nextFailure == null) {
+            dataTree.startPreCommit(this);
+        } else {
+            failedPreCommit(nextFailure);
         }
-        return dataTreeModification;
     }
 
     @Override
     public ListenableFuture<Void> abort() {
-        // No-op, really
-        return VOID_FUTURE;
+        dataTree.startAbort(this);
+        state = State.ABORTED;
+
+        final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
+        if (!maybeAborts.isPresent()) {
+            return VOID_FUTURE;
+        }
+
+        final Future<Iterable<Object>> aborts = maybeAborts.get();
+        if (aborts.isCompleted()) {
+            return VOID_FUTURE;
+        }
+
+        final SettableFuture<Void> ret = SettableFuture.create();
+        aborts.onComplete(new OnComplete<Iterable<Object>>() {
+            @Override
+            public void onComplete(final Throwable failure, final Iterable<Object> objs) {
+                if (failure != null) {
+                    ret.setException(failure);
+                } else {
+                    ret.set(null);
+                }
+            }
+        }, ExecutionContexts.global());
+
+        return ret;
     }
 
     @Override
-    public ListenableFuture<Void> commit() {
+    public void commit(final FutureCallback<UnsignedLong> callback) {
+        checkState(State.PRE_COMMIT_COMPLETE);
+        this.callback = Preconditions.checkNotNull(callback);
+        state = State.COMMIT_PENDING;
+        dataTree.startCommit(this, candidate);
+    }
+
+    private <T> FutureCallback<T> switchState(final State newState) {
+        @SuppressWarnings("unchecked")
+        final FutureCallback<T> ret = (FutureCallback<T>) this.callback;
+        this.callback = null;
+        LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
+        this.state = newState;
+        return ret;
+    }
+
+    void successfulCanCommit() {
+        switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
+    }
+
+    void failedCanCommit(final Exception cause) {
+        switchState(State.FAILED).onFailure(cause);
+    }
+
+    /**
+     * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
+     * any failure to validate is propagated before we record the transaction.
+     *
+     * @param candidate {@link DataTreeCandidate} under consideration
+     * @throws ExecutionException
+     * @throws TimeoutException
+     */
+    // FIXME: this should be asynchronous
+    void userPreCommit(final DataTreeCandidate candidate) throws ExecutionException, TimeoutException {
+        userCohorts.canCommit(candidate);
+        userCohorts.preCommit();
+    }
+
+    void successfulPreCommit(final DataTreeCandidateTip candidate) {
+        LOG.trace("Transaction {} prepared candidate {}", transaction, candidate);
+        this.candidate = Verify.verifyNotNull(candidate);
+        switchState(State.PRE_COMMIT_COMPLETE).onSuccess(candidate);
+    }
+
+    void failedPreCommit(final Exception cause) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Transaction {} failed to prepare", transaction, cause);
+        } else {
+            LOG.error("Transaction {} failed to prepare", transactionId, cause);
+        }
+
+        userCohorts.abort();
+        switchState(State.FAILED).onFailure(cause);
+    }
+
+    void successfulCommit(final UnsignedLong journalIndex) {
         try {
-            dataTree.getDataTree().commit(candidate);
-        } catch (Exception e) {
-            if(LOG.isTraceEnabled()) {
-                LOG.trace("Transaction {} failed to commit", transaction, e);
-            } else {
-                LOG.error("Transaction failed to commit", e);
-            }
-            return Futures.immediateFailedFuture(e);
+            userCohorts.commit();
+        } catch (TimeoutException | ExecutionException e) {
+            // We are probably dead, depending on what the cohorts end up doing
+            LOG.error("User cohorts failed to commit", e);
         }
 
-        LOG.trace("Transaction {} committed, proceeding to notify", transaction);
-        dataTree.notifyListeners(candidate);
-        return VOID_FUTURE;
+        switchState(State.COMMITTED).onSuccess(journalIndex);
+    }
+
+    void failedCommit(final Exception cause) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Transaction {} failed to commit", transaction, cause);
+        } else {
+            LOG.error("Transaction failed to commit", cause);
+        }
+
+        userCohorts.abort();
+        switchState(State.FAILED).onFailure(cause);
+    }
+
+    @Override
+    public State getState() {
+        return state;
+    }
+
+    void reportFailure(final Exception cause) {
+        this.nextFailure = Preconditions.checkNotNull(cause);
+    }
+
+    @Override
+    public boolean isFailed() {
+        return state == State.FAILED || nextFailure != null;
     }
 }
index a42a628..8469d02 100644 (file)
@@ -13,10 +13,15 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCanCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulPreCommit;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -25,13 +30,15 @@ import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -41,8 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
@@ -73,6 +78,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
@@ -200,135 +206,77 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied));
     }
 
-    protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
-            final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
-            final MutableCompositeModification modification) {
-        return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
-    }
-
-    protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
-            final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
-            final MutableCompositeModification modification,
-            final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
-
-        final ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction(nextTransactionId());
-        tx.getSnapshot().write(path, data);
-        final ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
-
-        modification.addModification(new WriteModification(path, data));
-
-        return cohort;
-    }
-
-    protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
-            final ShardDataTreeCohort actual) {
-        return createDelegatingMockCohort(cohortName, actual, null);
-    }
+    protected TipProducingDataTree createDelegatingMockDataTree() throws Exception {
+        TipProducingDataTree actual = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
+        final TipProducingDataTree mock = mock(TipProducingDataTree.class);
 
-    protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
-            final ShardDataTreeCohort actual,
-            final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
-        final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName);
+        doAnswer(invocation -> {
+            actual.validate(invocation.getArgumentAt(0, DataTreeModification.class));
+            return null;
+        }).when(mock).validate(any(DataTreeModification.class));
 
-        doAnswer(new Answer<ListenableFuture<Boolean>>() {
-            @Override
-            public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
-                return actual.canCommit();
-            }
-        }).when(cohort).canCommit();
+        doAnswer(invocation -> {
+            return actual.prepare(invocation.getArgumentAt(0, DataTreeModification.class));
+        }).when(mock).prepare(any(DataTreeModification.class));
 
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
-                if(preCommit != null) {
-                    return preCommit.apply(actual);
-                } else {
-                    return actual.preCommit();
-                }
-            }
-        }).when(cohort).preCommit();
+        doAnswer(invocation -> {
+            actual.commit(invocation.getArgumentAt(0, DataTreeCandidate.class));
+            return null;
+        }).when(mock).commit(any(DataTreeCandidate.class));
 
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
-                return actual.commit();
-            }
-        }).when(cohort).commit();
+        doAnswer(invocation -> {
+            actual.setSchemaContext(invocation.getArgumentAt(0, SchemaContext.class));
+            return null;
+        }).when(mock).setSchemaContext(any(SchemaContext.class));
 
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
-                return actual.abort();
-            }
-        }).when(cohort).abort();
+        doAnswer(invocation -> {
+            return actual.takeSnapshot();
+        }).when(mock).takeSnapshot();
 
-        doAnswer(new Answer<DataTreeCandidateTip>() {
-            @Override
-            public DataTreeCandidateTip answer(final InvocationOnMock invocation) {
-                return actual.getCandidate();
-            }
-        }).when(cohort).getCandidate();
+        doAnswer(invocation -> {
+            return actual.getRootPath();
+        }).when(mock).getRootPath();
 
-        return cohort;
-    }
-
-    protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort,
-            TransactionIdentifier transactionID, MutableCompositeModification modification,
-            boolean doCommitOnReady) {
-        if(remoteReadWriteTransaction){
-            return prepareForwardedReadyTransaction(cohort, transactionID, CURRENT_VERSION,
-                    doCommitOnReady);
-        } else {
-            setupCohortDecorator(shard, cohort);
-            return prepareBatchedModifications(transactionID, modification, doCommitOnReady);
-        }
+        return mock;
     }
 
     protected ShardDataTreeCohort mockShardDataTreeCohort() {
         ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class);
-        doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-        doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
-        doReturn(Futures.immediateFuture(null)).when(cohort).commit();
-        doReturn(mockCandidate("candidate")).when(cohort).getCandidate();
+        DataTreeCandidate candidate = mockCandidate("candidate");
+        successfulCanCommit(cohort);
+        successfulPreCommit(cohort, candidate);
+        successfulCommit(cohort);
+        doReturn(candidate).when(cohort).getCandidate();
         return cohort;
     }
 
-    static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) {
-        ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class);
-        doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class));
-        doNothing().when(mockParent).abortTransaction(any(AbstractShardDataTreeTransaction.class));
-        return mockParent;
-    }
-
-    protected ForwardedReadyTransaction prepareForwardedReadyTransaction(ShardDataTreeCohort cohort,
-            TransactionIdentifier transactionID, short version, boolean doCommitOnReady) {
-        return new ForwardedReadyTransaction(transactionID, version,
-                new ReadWriteShardDataTreeTransaction(newShardDataTreeTransactionParent(cohort), transactionID,
-                        mock(DataTreeModification.class)), doCommitOnReady);
-    }
-
-    protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort,
-            TransactionIdentifier transactionID, MutableCompositeModification modification) {
-        return prepareReadyTransactionMessage(remoteReadWriteTransaction, shard, cohort, transactionID, modification, false);
-    }
+    protected Map<TransactionIdentifier, CapturingShardDataTreeCohort> setupCohortDecorator(final Shard shard,
+            final TransactionIdentifier... transactionIDs) {
+        final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = new HashMap<>();
+        for(TransactionIdentifier id: transactionIDs) {
+            cohortMap.put(id, new CapturingShardDataTreeCohort());
+        }
 
-    protected void setupCohortDecorator(Shard shard, final ShardDataTreeCohort cohort) {
         shard.getCommitCoordinator().setCohortDecorator(new ShardCommitCoordinator.CohortDecorator() {
             @Override
-            public ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual) {
+            public ShardDataTreeCohort decorate(final Identifier transactionID, final ShardDataTreeCohort actual) {
+                CapturingShardDataTreeCohort cohort = cohortMap.get(transactionID);
+                cohort.setDelegate(actual);
                 return cohort;
             }
         });
+
+        return cohortMap;
     }
 
-    protected BatchedModifications prepareBatchedModifications(TransactionIdentifier transactionID,
-                                                               MutableCompositeModification modification) {
+    protected BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID,
+                                                               final MutableCompositeModification modification) {
         return prepareBatchedModifications(transactionID, modification, false);
     }
 
-    private static BatchedModifications prepareBatchedModifications(TransactionIdentifier transactionID,
-                                                             MutableCompositeModification modification,
-                                                             boolean doCommitOnReady) {
+    protected static BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID,
+                                                             final MutableCompositeModification modification,
+                                                             final boolean doCommitOnReady) {
         final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION);
         batchedModifications.addModification(modification);
         batchedModifications.setReady(true);
@@ -337,6 +285,21 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         return batchedModifications;
     }
 
+    protected static BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID,
+            final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean doCommitOnReady) {
+        final MutableCompositeModification modification = new MutableCompositeModification();
+        modification.addModification(new WriteModification(path, data));
+        return prepareBatchedModifications(transactionID, modification, doCommitOnReady);
+    }
+
+    protected static ForwardedReadyTransaction prepareForwardedReadyTransaction(final TestActorRef<Shard> shard,
+            final TransactionIdentifier transactionID, final YangInstanceIdentifier path,
+            final NormalizedNode<?, ?> data, final boolean doCommitOnReady) {
+        ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
+                newReadWriteTransaction(transactionID);
+        rwTx.getSnapshot().write(path, data);
+        return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, doCommitOnReady);
+    }
 
     public static NormalizedNode<?,?> readStore(final TestActorRef<? extends Shard> shard, final YangInstanceIdentifier id)
             throws ExecutionException, InterruptedException {
@@ -364,9 +327,9 @@ public abstract class AbstractShardTest extends AbstractActorTest{
 
         transaction.getSnapshot().write(id, node);
         final ShardDataTreeCohort cohort = transaction.ready();
-        cohort.canCommit().get();
-        cohort.preCommit().get();
-        cohort.commit();
+        immediateCanCommit(cohort);
+        immediatePreCommit(cohort);
+        immediateCommit(cohort);
     }
 
     public void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
@@ -375,9 +338,9 @@ public abstract class AbstractShardTest extends AbstractActorTest{
 
         transaction.getSnapshot().merge(id, node);
         final ShardDataTreeCohort cohort = transaction.ready();
-        cohort.canCommit().get();
-        cohort.preCommit().get();
-        cohort.commit();
+        immediateCanCommit(cohort);
+        immediatePreCommit(cohort);
+        immediateCommit(cohort);
     }
 
     public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
@@ -478,4 +441,94 @@ public abstract class AbstractShardTest extends AbstractActorTest{
             return delegate.create();
         }
     }
+
+    public static class CapturingShardDataTreeCohort extends ShardDataTreeCohort {
+        private volatile ShardDataTreeCohort delegate;
+        private FutureCallback<Void> canCommit;
+        private FutureCallback<DataTreeCandidate> preCommit;
+        private FutureCallback<UnsignedLong> commit;
+
+        public void setDelegate(ShardDataTreeCohort delegate) {
+            this.delegate = delegate;
+        }
+
+        public FutureCallback<Void> getCanCommit() {
+            assertNotNull("canCommit was not invoked", canCommit);
+            return canCommit;
+        }
+
+        public FutureCallback<DataTreeCandidate> getPreCommit() {
+            assertNotNull("preCommit was not invoked", preCommit);
+            return preCommit;
+        }
+
+        public FutureCallback<UnsignedLong> getCommit() {
+            assertNotNull("commit was not invoked", commit);
+            return commit;
+        }
+
+        @Override
+        public TransactionIdentifier getIdentifier() {
+            return delegate.getIdentifier();
+        }
+
+        @Override
+        DataTreeCandidateTip getCandidate() {
+            return delegate.getCandidate();
+        }
+
+        @Override
+        DataTreeModification getDataTreeModification() {
+            return delegate.getDataTreeModification();
+        }
+
+        @Override
+        public void canCommit(FutureCallback<Void> callback) {
+            canCommit = mockFutureCallback(callback);
+            delegate.canCommit(canCommit);
+        }
+
+        @Override
+        public void preCommit(FutureCallback<DataTreeCandidate> callback) {
+            preCommit = mockFutureCallback(callback);
+            delegate.preCommit(preCommit);
+        }
+
+        @Override
+        public void commit(FutureCallback<UnsignedLong> callback) {
+            commit = mockFutureCallback(callback);
+            delegate.commit(commit);
+        }
+
+        @SuppressWarnings("unchecked")
+        private <T> FutureCallback<T> mockFutureCallback(final FutureCallback<T> actual ) {
+            FutureCallback<T> mock = mock(FutureCallback.class);
+            doAnswer(invocation -> {
+                actual.onFailure(invocation.getArgumentAt(0, Throwable.class));
+                return null;
+            }).when(mock).onFailure(any(Throwable.class));
+
+            doAnswer(invocation -> {
+                actual.onSuccess((T) invocation.getArgumentAt(0, Throwable.class));
+                return null;
+            }).when(mock).onSuccess((T) any(Object.class));
+
+            return mock;
+        }
+
+        @Override
+        public ListenableFuture<Void> abort() {
+            return delegate.abort();
+        }
+
+        @Override
+        public boolean isFailed() {
+            return delegate.isFailed();
+        }
+
+        @Override
+        public State getState() {
+            return delegate.getState();
+        }
+    }
 }
index ac6f801..ffe3226 100644 (file)
@@ -47,7 +47,7 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
     private DataChangeListenerSupport support;
 
     @Before
-    public void setup() {
+    public void setup() throws InterruptedException {
         shard = createShard();
         support = new DataChangeListenerSupport(shard);
     }
@@ -151,8 +151,8 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
         listener.verifyCreatedData(0, innerEntryPath(2, "four"));
     }
 
-    private MockDataChangeListener registerChangeListener(YangInstanceIdentifier path, DataChangeScope scope,
-            int expectedEvents, boolean isLeader) {
+    private MockDataChangeListener registerChangeListener(final YangInstanceIdentifier path, final DataChangeScope scope,
+            final int expectedEvents, final boolean isLeader) {
         MockDataChangeListener listener = new MockDataChangeListener(expectedEvents);
         ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener));
 
@@ -162,6 +162,8 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
 
     private Shard createShard() {
         TestActorRef<Shard> actor = actorFactory.createTestActor(newShardProps());
+        ShardTestKit.waitUntilLeader(actor);
+
         return actor.underlyingActor();
     }
 }
index 9baea72..a11fc6b 100644 (file)
@@ -113,8 +113,8 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
         listener2.verifyNoNotifiedData(innerEntryPath(2, "three"), innerEntryPath(2, "four"));
     }
 
-    private MockDataTreeChangeListener registerChangeListener(YangInstanceIdentifier path,
-            int expectedEvents, boolean isLeader) {
+    private MockDataTreeChangeListener registerChangeListener(final YangInstanceIdentifier path,
+            final int expectedEvents, final boolean isLeader) {
         MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
         ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
         support.onMessage(new RegisterDataTreeChangeListener(path, dclActor, false), isLeader, true);
@@ -123,6 +123,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
 
     private Shard createShard() {
         TestActorRef<Shard> actor = actorFactory.createTestActor(newShardProps());
+        ShardTestKit.waitUntilLeader(actor);
         return actor.underlyingActor();
     }
 }
index 1b2657e..47cc359 100644 (file)
@@ -49,7 +49,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
-import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
@@ -79,9 +79,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 
 public class DistributedDataStoreIntegrationTest {
 
@@ -1158,7 +1160,7 @@ public class DistributedDataStoreIntegrationTest {
     }
 
     @Test
-    public void testRestoreFromDatastoreSnapshot() throws Exception{
+    public void testRestoreFromDatastoreSnapshot() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             String name = "transactionIntegrationTest";
 
@@ -1166,20 +1168,21 @@ public class DistributedDataStoreIntegrationTest {
                     CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
                     CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
 
-            ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL);
+            DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+            dataTree.setSchemaContext(SchemaContextHelper.full());
             AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
-            NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree.getDataTree(),
-                    YangInstanceIdentifier.EMPTY);
+            NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
 
-            Snapshot carsSnapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(root).serialize(),
+            Snapshot carsSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
                     Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
 
             NormalizedNode<?, ?> peopleNode = PeopleModel.create();
-            dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL);
+            dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+            dataTree.setSchemaContext(SchemaContextHelper.full());
             AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
-            root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.EMPTY);
+            root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
 
-            Snapshot peopleSnapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(root).serialize(),
+            Snapshot peopleSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
                     Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
 
             restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMocking.java
new file mode 100644 (file)
index 0000000..dff4162
--- /dev/null
@@ -0,0 +1,142 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+public final class ShardDataTreeMocking {
+
+    private ShardDataTreeMocking() {
+        throw new UnsupportedOperationException();
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> FutureCallback<T> mockCallback() {
+        return mock(FutureCallback.class);
+    }
+
+    public static ShardDataTreeCohort immediateCanCommit(final ShardDataTreeCohort cohort) {
+        final FutureCallback<Void> callback = mockCallback();
+        doNothing().when(callback).onSuccess(null);
+        cohort.canCommit(callback);
+
+        verify(callback).onSuccess(null);
+        verifyNoMoreInteractions(callback);
+        return cohort;
+    }
+
+    public static ShardDataTreeCohort immediatePreCommit(final ShardDataTreeCohort cohort) {
+        final FutureCallback<DataTreeCandidate> callback = mockCallback();
+        doNothing().when(callback).onSuccess(any(DataTreeCandidate.class));
+        cohort.preCommit(callback);
+
+        verify(callback).onSuccess(any(DataTreeCandidate.class));
+        verifyNoMoreInteractions(callback);
+        return cohort;
+    }
+
+    public static ShardDataTreeCohort immediateCommit(final ShardDataTreeCohort cohort) {
+        final FutureCallback<UnsignedLong> callback = mockCallback();
+        doNothing().when(callback).onSuccess(any(UnsignedLong.class));
+        cohort.commit(callback);
+
+        verify(callback, timeout(5000)).onSuccess(any(UnsignedLong.class));
+        verifyNoMoreInteractions(callback);
+        return cohort;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> Object invokeSuccess(final InvocationOnMock invocation, final T value) {
+        invocation.getArgumentAt(0, FutureCallback.class).onSuccess(value);
+        return null;
+    }
+
+    private static Object invokeFailure(final InvocationOnMock invocation) {
+        invocation.getArgumentAt(0, FutureCallback.class).onFailure(mock(Exception.class));
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static ShardDataTreeCohort failedCanCommit(final ShardDataTreeCohort mock) {
+        doAnswer(invocation -> {
+            return invokeFailure(invocation);
+        }).when(mock).canCommit(any(FutureCallback.class));
+        return mock;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static ShardDataTreeCohort failedPreCommit(final ShardDataTreeCohort mock) {
+        doAnswer(invocation -> {
+            return invokeFailure(invocation);
+        }).when(mock).preCommit(any(FutureCallback.class));
+        return mock;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static ShardDataTreeCohort failedCommit(final ShardDataTreeCohort mock) {
+        doAnswer(invocation -> {
+            return invokeFailure(invocation);
+        }).when(mock).commit(any(FutureCallback.class));
+        return mock;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static ShardDataTreeCohort successfulCanCommit(final ShardDataTreeCohort mock) {
+        doAnswer(invocation -> {
+            return invokeSuccess(invocation, null);
+        }).when(mock).canCommit(any(FutureCallback.class));
+
+        return mock;
+    }
+
+    public static ShardDataTreeCohort successfulPreCommit(final ShardDataTreeCohort mock) {
+        return successfulPreCommit(mock, mock(DataTreeCandidate.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    public static ShardDataTreeCohort successfulPreCommit(final ShardDataTreeCohort mock, final DataTreeCandidate candidate) {
+        doAnswer(invocation -> {
+            return invokeSuccess(invocation, candidate);
+        }).when(mock).preCommit(any(FutureCallback.class));
+
+        return mock;
+    }
+
+    public static ShardDataTreeCohort successfulCommit(final ShardDataTreeCohort mock) {
+        return successfulCommit(mock, UnsignedLong.ZERO);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static ShardDataTreeCohort successfulCommit(final ShardDataTreeCohort mock, final UnsignedLong index) {
+        doAnswer(invocation -> {
+            return invokeSuccess(invocation, index);
+        }).when(mock).commit(any(FutureCallback.class));
+
+        return mock;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void assertSequencedCommit(final ShardDataTreeCohort mock) {
+        final InOrder inOrder = inOrder(mock);
+        inOrder.verify(mock).canCommit(any(FutureCallback.class));
+        inOrder.verify(mock).preCommit(any(FutureCallback.class));
+        inOrder.verify(mock).commit(any(FutureCallback.class));
+    }
+}
index d35b2db..5d27224 100644 (file)
@@ -10,13 +10,20 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
 import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -30,31 +37,38 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class ShardDataTreeTest extends AbstractTest {
 
-    SchemaContext fullSchema;
+    private final Shard mockShard = Mockito.mock(Shard.class);
+
+
+    private SchemaContext fullSchema;
 
     @Before
-    public void setUp(){
+    public void setUp() {
+        doReturn(true).when(mockShard).canSkipPayload();
+        doReturn(Ticker.systemTicker()).when(mockShard).ticker();
+        doReturn(Mockito.mock(ShardStats.class)).when(mockShard).getShardMBean();
+
         fullSchema = SchemaContextHelper.full();
     }
 
     @Test
     public void testWrite() throws ExecutionException, InterruptedException {
-        modify(new ShardDataTree(fullSchema, TreeType.OPERATIONAL), false, true, true);
+        modify(new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL), false, true, true);
     }
 
     @Test
     public void testMerge() throws ExecutionException, InterruptedException {
-        modify(new ShardDataTree(fullSchema, TreeType.OPERATIONAL), true, true, true);
+        modify(new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL), true, true, true);
     }
 
 
-    private void modify(ShardDataTree shardDataTree, boolean merge, boolean expectedCarsPresent, boolean expectedPeoplePresent) throws ExecutionException, InterruptedException {
+    private void modify(final ShardDataTree shardDataTree, final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) throws ExecutionException, InterruptedException {
 
         assertEquals(fullSchema, shardDataTree.getSchemaContext());
 
-        ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
+        final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
 
-        DataTreeModification snapshot = transaction.getSnapshot();
+        final DataTreeModification snapshot = transaction.getSnapshot();
 
         assertNotNull(snapshot);
 
@@ -66,21 +80,21 @@ public class ShardDataTreeTest extends AbstractTest {
             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
         }
 
-        ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
-
-        cohort.preCommit().get();
-        cohort.commit().get();
+        final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
 
+        immediateCanCommit(cohort);
+        immediatePreCommit(cohort);
+        immediateCommit(cohort);
 
-        ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId());
+        final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId());
 
-        DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
+        final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
 
-        Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
+        final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
 
         assertEquals(expectedCarsPresent, optional.isPresent());
 
-        Optional<NormalizedNode<?, ?>> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
+        final Optional<NormalizedNode<?, ?>> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
 
         assertEquals(expectedPeoplePresent, optional1.isPresent());
 
@@ -88,52 +102,52 @@ public class ShardDataTreeTest extends AbstractTest {
 
     @Test
     public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException {
-        ShardDataTree shardDataTree = new ShardDataTree(fullSchema, TreeType.OPERATIONAL);
+        final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
 
-        List<DataTreeCandidateTip> candidates = new ArrayList<>();
+        final List<DataTreeCandidateTip> candidates = new ArrayList<>();
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
 
-        NormalizedNode<?, ?> expected = getCars(shardDataTree);
+        final NormalizedNode<?, ?> expected = getCars(shardDataTree);
 
         applyCandidates(shardDataTree, candidates);
 
-        NormalizedNode<?, ?> actual = getCars(shardDataTree);
+        final NormalizedNode<?, ?> actual = getCars(shardDataTree);
 
         assertEquals(expected, actual);
     }
 
     @Test
     public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException {
-        ShardDataTree shardDataTree = new ShardDataTree(fullSchema, TreeType.OPERATIONAL);
+        final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
 
-        List<DataTreeCandidateTip> candidates = new ArrayList<>();
+        final List<DataTreeCandidateTip> candidates = new ArrayList<>();
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
         candidates.add(addCar(shardDataTree));
         candidates.add(removeCar(shardDataTree));
 
-        NormalizedNode<?, ?> expected = getCars(shardDataTree);
+        final NormalizedNode<?, ?> expected = getCars(shardDataTree);
 
         applyCandidates(shardDataTree, candidates);
 
-        NormalizedNode<?, ?> actual = getCars(shardDataTree);
+        final NormalizedNode<?, ?> actual = getCars(shardDataTree);
 
         assertEquals(expected, actual);
     }
 
-    private static NormalizedNode<?, ?> getCars(ShardDataTree shardDataTree) {
-        ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId());
-        DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
+    private static NormalizedNode<?, ?> getCars(final ShardDataTree shardDataTree) {
+        final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId());
+        final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
 
-        Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
+        final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
 
         assertEquals(true, optional.isPresent());
 
         return optional.get();
     }
 
-    private static DataTreeCandidateTip addCar(ShardDataTree shardDataTree) throws ExecutionException, InterruptedException {
+    private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree) throws ExecutionException, InterruptedException {
         return doTransaction(shardDataTree, snapshot -> {
                 snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
                 snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
@@ -141,7 +155,7 @@ public class ShardDataTreeTest extends AbstractTest {
             });
     }
 
-    private static DataTreeCandidateTip removeCar(ShardDataTree shardDataTree) throws ExecutionException, InterruptedException {
+    private static DataTreeCandidateTip removeCar(final ShardDataTree shardDataTree) throws ExecutionException, InterruptedException {
         return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
     }
 
@@ -150,34 +164,34 @@ public class ShardDataTreeTest extends AbstractTest {
         void execute(DataTreeModification snapshot);
     }
 
-    private static DataTreeCandidateTip doTransaction(ShardDataTree shardDataTree, DataTreeOperation operation)
+    private static DataTreeCandidateTip doTransaction(final ShardDataTree shardDataTree, final DataTreeOperation operation)
             throws ExecutionException, InterruptedException {
-        ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
-        DataTreeModification snapshot = transaction.getSnapshot();
+        final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
+        final DataTreeModification snapshot = transaction.getSnapshot();
         operation.execute(snapshot);
-        ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+        final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
 
-        cohort.canCommit().get();
-        cohort.preCommit().get();
-        DataTreeCandidateTip candidate = cohort.getCandidate();
-        cohort.commit().get();
+        immediateCanCommit(cohort);
+        immediatePreCommit(cohort);
+        final DataTreeCandidateTip candidate = cohort.getCandidate();
+        immediateCommit(cohort);
 
         return candidate;
     }
 
-    private static DataTreeCandidateTip applyCandidates(ShardDataTree shardDataTree, List<DataTreeCandidateTip> candidates)
+    private static DataTreeCandidateTip applyCandidates(final ShardDataTree shardDataTree, final List<DataTreeCandidateTip> candidates)
             throws ExecutionException, InterruptedException {
-        ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
-        DataTreeModification snapshot = transaction.getSnapshot();
-        for(DataTreeCandidateTip candidateTip : candidates){
+        final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
+        final DataTreeModification snapshot = transaction.getSnapshot();
+        for(final DataTreeCandidateTip candidateTip : candidates){
             DataTreeCandidates.applyToModification(snapshot, candidateTip);
         }
-        ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+        final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
 
-        cohort.canCommit().get();
-        cohort.preCommit().get();
-        DataTreeCandidateTip candidate = cohort.getCandidate();
-        cohort.commit().get();
+        immediateCanCommit(cohort);
+        immediatePreCommit(cohort);
+        final DataTreeCandidateTip candidate = cohort.getCandidate();
+        immediateCommit(cohort);
 
         return candidate;
     }
index 1ceb2c7..acac104 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.base.Optional;
 import java.io.IOException;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
@@ -42,7 +43,9 @@ public class ShardRecoveryCoordinatorTest extends AbstractTest {
         peopleSchemaContext = SchemaContextHelper.select(SchemaContextHelper.PEOPLE_YANG);
         carsSchemaContext = SchemaContextHelper.select(SchemaContextHelper.CARS_YANG);
 
-        peopleDataTree = new ShardDataTree(peopleSchemaContext, TreeType.OPERATIONAL);
+        final Shard mockShard = Mockito.mock(Shard.class);
+
+        peopleDataTree = new ShardDataTree(mockShard, peopleSchemaContext, TreeType.OPERATIONAL);
     }
 
     @Deprecated
index 8f7dd8e..d4dcc9c 100644 (file)
@@ -14,11 +14,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -31,14 +30,13 @@ import akka.pattern.Patterns;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
-import com.google.common.base.Function;
 import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -64,6 +62,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
@@ -75,11 +74,10 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeCh
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
@@ -90,6 +88,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
@@ -101,6 +100,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftStat
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -112,11 +112,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@@ -411,7 +410,8 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testApplySnapshot() throws Exception {
 
-        final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
+        final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps().
+                withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
 
         ShardTestKit.waitUntilLeader(shard);
 
@@ -428,14 +428,24 @@ public class ShardTest extends AbstractShardTest {
         final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
         final NormalizedNode<?,?> expected = readStore(store, root);
 
-        final Snapshot snapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(expected).serialize(),
+        final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(expected).serialize(),
                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
 
-        shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
+        shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
 
-        final NormalizedNode<?,?> actual = readStore(shard, root);
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+            Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
 
-        assertEquals("Root node", expected, actual);
+            try {
+                assertEquals("Root node", expected, readStore(shard, root));
+                return;
+            } catch(AssertionError e) {
+                // try again
+            }
+        }
+
+        fail("Snapshot was not applied");
     }
 
     @Test
@@ -518,35 +528,22 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
-
-            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
             final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification1 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
             final TransactionIdentifier transactionID2 = nextTransactionId();
-            final MutableCompositeModification modification2 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
-                    TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
-                    modification2);
-
             final TransactionIdentifier transactionID3 = nextTransactionId();
-            final MutableCompositeModification modification3 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
-                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
-                    modification3);
+
+            Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
+                    shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
+            final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
+            final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
+            final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
 
             final long timeoutSec = 5;
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+            shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.class));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -558,10 +555,15 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+            // Ready 2 more Tx's.
+
+            shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
+            shard.tell(prepareBatchedModifications(transactionID3, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
@@ -655,16 +657,18 @@ public class ShardTest extends AbstractShardTest {
 
             assertEquals("Commits complete", true, done);
 
-            final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
-            inOrder.verify(cohort1).canCommit();
-            inOrder.verify(cohort1).preCommit();
-            inOrder.verify(cohort1).commit();
-            inOrder.verify(cohort2).canCommit();
-            inOrder.verify(cohort2).preCommit();
-            inOrder.verify(cohort2).commit();
-            inOrder.verify(cohort3).canCommit();
-            inOrder.verify(cohort3).preCommit();
-            inOrder.verify(cohort3).commit();
+            final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
+                    cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
+                    cohort3.getPreCommit(), cohort3.getCommit());
+            inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
+            inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+            inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
+            inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
+            inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+            inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
+            inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
+            inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+            inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
 
             // Verify data in the data store.
 
@@ -686,17 +690,6 @@ public class ShardTest extends AbstractShardTest {
             final TransactionIdentifier transactionID = nextTransactionId();
             final FiniteDuration duration = duration("5 seconds");
 
-            final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
-            final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
-                if(mockCohort.get() == null) {
-                    mockCohort.set(createDelegatingMockCohort("cohort", actual));
-                }
-
-                return mockCohort.get();
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             // Send a BatchedModifications to start a transaction.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
@@ -721,16 +714,11 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send the CanCommitTransaction message.
+            // Send the CommitTransaction message.
 
             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.class);
 
-            final InOrder inOrder = inOrder(mockCohort.get());
-            inOrder.verify(mockCohort.get()).canCommit();
-            inOrder.verify(mockCohort.get()).preCommit();
-            inOrder.verify(mockCohort.get()).commit();
-
             // Verify data in the data store.
 
             verifyOuterListEntry(shard, 1);
@@ -749,17 +737,6 @@ public class ShardTest extends AbstractShardTest {
             final TransactionIdentifier transactionID = nextTransactionId();
             final FiniteDuration duration = duration("5 seconds");
 
-            final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
-            final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
-                if(mockCohort.get() == null) {
-                    mockCohort.set(createDelegatingMockCohort("cohort", actual));
-                }
-
-                return mockCohort.get();
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             // Send a BatchedModifications to start a transaction.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
@@ -778,11 +755,6 @@ public class ShardTest extends AbstractShardTest {
 
             expectMsgClass(duration, CommitTransactionReply.class);
 
-            final InOrder inOrder = inOrder(mockCohort.get());
-            inOrder.verify(mockCohort.get()).canCommit();
-            inOrder.verify(mockCohort.get()).preCommit();
-            inOrder.verify(mockCohort.get()).commit();
-
             // Verify data in the data store.
 
             verifyOuterListEntry(shard, 1);
@@ -956,8 +928,8 @@ public class ShardTest extends AbstractShardTest {
             Failure failure = expectMsgClass(Failure.class);
             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
 
-            shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), txId,
-                    DataStoreVersions.CURRENT_VERSION, true), getRef());
+            shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
             failure = expectMsgClass(Failure.class);
             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
 
@@ -985,24 +957,16 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
             final TransactionIdentifier transactionID = nextTransactionId();
-            final MutableCompositeModification modification = new MutableCompositeModification();
             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
-                    TestModel.TEST_PATH, containerNode, modification);
-
-            final FiniteDuration duration = duration("5 seconds");
-
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
-
-            expectMsgClass(duration, CommitTransactionReply.class);
+            if(readWrite) {
+                shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
+                        containerNode, true), getRef());
+            } else {
+                shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
+            }
 
-            final InOrder inOrder = inOrder(cohort);
-            inOrder.verify(cohort).canCommit();
-            inOrder.verify(cohort).preCommit();
-            inOrder.verify(cohort).commit();
+            expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
 
             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
@@ -1085,36 +1049,21 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
-        testCommitWithPersistenceDisabled(true);
-    }
-
-    @Test
-    public void testWriteOnlyCommitWithPersistenceDisabled() throws Throwable {
-        testCommitWithPersistenceDisabled(true);
-    }
-
-    private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.persistent(false);
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCommitWithPersistenceDisabled-" + readWrite);
+                    "testCommitWithPersistenceDisabled");
 
             waitUntilLeader(shard);
 
-            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
             // Setup a simulated transactions with a mock cohort.
 
-            final TransactionIdentifier transactionID = nextTransactionId();
-            final MutableCompositeModification modification = new MutableCompositeModification();
-            final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
-                TestModel.TEST_PATH, containerNode, modification);
-
             final FiniteDuration duration = duration("5 seconds");
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+            final TransactionIdentifier transactionID = nextTransactionId();
+            final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1129,11 +1078,6 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.class);
 
-            final InOrder inOrder = inOrder(cohort);
-            inOrder.verify(cohort).canCommit();
-            inOrder.verify(cohort).preCommit();
-            inOrder.verify(cohort).commit();
-
             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
         }};
@@ -1152,153 +1096,134 @@ public class ShardTest extends AbstractShardTest {
     private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
         // Note that persistence is enabled which would normally result in the entry getting written to the journal
         // but here that need not happen
-        new ShardTestKit(getSystem()) {
-            {
-                final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testCommitWhenTransactionHasNoModifications-" + readWrite);
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCommitWhenTransactionHasNoModifications-" + readWrite);
 
-                waitUntilLeader(shard);
+            waitUntilLeader(shard);
 
-                final TransactionIdentifier transactionID = nextTransactionId();
-                final MutableCompositeModification modification = new MutableCompositeModification();
-                final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
-                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
-                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
-                doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
+            final TransactionIdentifier transactionID = nextTransactionId();
 
-                final FiniteDuration duration = duration("5 seconds");
+            final FiniteDuration duration = duration("5 seconds");
 
-                shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.class);
+            if(readWrite) {
+                ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
+                        newReadWriteTransaction(transactionID);
+                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
+            } else {
+                shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()), getRef());
+            }
 
-                // Send the CanCommitTransaction message.
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-                shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-                final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                        expectMsgClass(duration, CanCommitTransactionReply.class));
-                assertEquals("Can commit", true, canCommitReply.getCanCommit());
+            // Send the CanCommitTransaction message.
 
-                shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-                expectMsgClass(duration, CommitTransactionReply.class);
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-                final InOrder inOrder = inOrder(cohort);
-                inOrder.verify(cohort).canCommit();
-                inOrder.verify(cohort).preCommit();
-                inOrder.verify(cohort).commit();
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
-                shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
-                final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+            shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+            final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
 
-                // Use MBean for verification
-                // Committed transaction count should increase as usual
-                assertEquals(1,shardStats.getCommittedTransactionsCount());
+            // Use MBean for verification
+            // Committed transaction count should increase as usual
+            assertEquals(1,shardStats.getCommittedTransactionsCount());
 
-                // Commit index should not advance because this does not go into the journal
-                assertEquals(-1, shardStats.getCommitIndex());
-            }
-        };
+            // Commit index should not advance because this does not go into the journal
+            assertEquals(-1, shardStats.getCommitIndex());
+        }};
     }
 
     @Test
-    public void testReadWriteCommitWhenTransactionHasModifications() {
+    public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
         testCommitWhenTransactionHasModifications(true);
     }
 
     @Test
-    public void testWriteOnlyCommitWhenTransactionHasModifications() {
+    public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
         testCommitWhenTransactionHasModifications(false);
     }
 
-    private void testCommitWhenTransactionHasModifications(final boolean readWrite){
-        new ShardTestKit(getSystem()) {
-            {
-                final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        "testCommitWhenTransactionHasModifications-" + readWrite);
+    private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
+        new ShardTestKit(getSystem()) {{
+            final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCommitWhenTransactionHasModifications-" + readWrite);
 
-                waitUntilLeader(shard);
+            waitUntilLeader(shard);
 
-                final TransactionIdentifier transactionID = nextTransactionId();
-                final MutableCompositeModification modification = new MutableCompositeModification();
-                modification.addModification(new DeleteModification(YangInstanceIdentifier.EMPTY));
-                final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
-                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
-                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
-                doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
+            final FiniteDuration duration = duration("5 seconds");
+            final TransactionIdentifier transactionID = nextTransactionId();
 
-                final FiniteDuration duration = duration("5 seconds");
+            if(readWrite) {
+                shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+            } else {
+                shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+            }
 
-                shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-                // Send the CanCommitTransaction message.
+            // Send the CanCommitTransaction message.
 
-                shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-                final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                        expectMsgClass(duration, CanCommitTransactionReply.class));
-                assertEquals("Can commit", true, canCommitReply.getCanCommit());
+            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.class));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-                shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-                expectMsgClass(duration, CommitTransactionReply.class);
+            shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
 
-                final InOrder inOrder = inOrder(cohort);
-                inOrder.verify(cohort).canCommit();
-                inOrder.verify(cohort).preCommit();
-                inOrder.verify(cohort).commit();
+            final InOrder inOrder = inOrder(dataTree);
+            inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+            inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+            inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
 
-                shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
-                final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+            shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+            final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
 
-                // Use MBean for verification
-                // Committed transaction count should increase as usual
-                assertEquals(1, shardStats.getCommittedTransactionsCount());
+            // Use MBean for verification
+            // Committed transaction count should increase as usual
+            assertEquals(1, shardStats.getCommittedTransactionsCount());
 
-                // Commit index should advance as we do not have an empty modification
-                assertEquals(0, shardStats.getCommitIndex());
-            }
-        };
+            // Commit index should advance as we do not have an empty modification
+            assertEquals(0, shardStats.getCommitIndex());
+        }};
     }
 
     @Test
     public void testCommitPhaseFailure() throws Throwable {
-        testCommitPhaseFailure(true);
-        testCommitPhaseFailure(false);
-    }
-
-    private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
+            final TipProducingDataTree dataTree = createDelegatingMockDataTree();
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCommitPhaseFailure-" + readWrite);
+                    newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCommitPhaseFailure");
 
             waitUntilLeader(shard);
 
+            final FiniteDuration duration = duration("5 seconds");
+            final Timeout timeout = new Timeout(duration);
+
             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
             // commit phase.
 
-            final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification1 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
-            doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
-            doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
-            doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
-
-            final TransactionIdentifier transactionID2 = nextTransactionId();
-            final MutableCompositeModification modification2 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
-            final FiniteDuration duration = duration("5 seconds");
-            final Timeout timeout = new Timeout(duration);
+            doThrow(new RuntimeException("mock commit failure")).when(dataTree).commit(any(DataTreeCandidate.class));
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+            final TransactionIdentifier transactionID1 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+            final TransactionIdentifier transactionID2 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1332,46 +1257,37 @@ public class ShardTest extends AbstractShardTest {
 
             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
 
-            final InOrder inOrder = inOrder(cohort1, cohort2);
-            inOrder.verify(cohort1).canCommit();
-            inOrder.verify(cohort1).preCommit();
-            inOrder.verify(cohort1).commit();
-            inOrder.verify(cohort2).canCommit();
+            final InOrder inOrder = inOrder(dataTree);
+            inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+            inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+            inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
+            inOrder.verify(dataTree).validate(any(DataTreeModification.class));
         }};
     }
 
     @Test
     public void testPreCommitPhaseFailure() throws Throwable {
-        testPreCommitPhaseFailure(true);
-        testPreCommitPhaseFailure(false);
-    }
-
-    private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
+            final TipProducingDataTree dataTree = createDelegatingMockDataTree();
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testPreCommitPhaseFailure-" + readWrite);
+                    newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testPreCommitPhaseFailure");
 
             waitUntilLeader(shard);
 
-            final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification1 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
-            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
-
-            final TransactionIdentifier transactionID2 = nextTransactionId();
-            final MutableCompositeModification modification2 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+            doThrow(new RuntimeException("mock preCommit failure")).when(dataTree).prepare(any(DataTreeModification.class));
+
+            final TransactionIdentifier transactionID1 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+            final TransactionIdentifier transactionID2 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1405,35 +1321,31 @@ public class ShardTest extends AbstractShardTest {
 
             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
 
-            final InOrder inOrder = inOrder(cohort1, cohort2);
-            inOrder.verify(cohort1).canCommit();
-            inOrder.verify(cohort1).preCommit();
-            inOrder.verify(cohort2).canCommit();
+            final InOrder inOrder = inOrder(dataTree);
+            inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+            inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+            inOrder.verify(dataTree).validate(any(DataTreeModification.class));
         }};
     }
 
     @Test
     public void testCanCommitPhaseFailure() throws Throwable {
-        testCanCommitPhaseFailure(true);
-        testCanCommitPhaseFailure(false);
-    }
-
-    private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
+            final TipProducingDataTree dataTree = createDelegatingMockDataTree();
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCanCommitPhaseFailure-" + readWrite);
+                    newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCanCommitPhaseFailure");
 
             waitUntilLeader(shard);
 
             final FiniteDuration duration = duration("5 seconds");
-
             final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
-            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
+            doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")).
+                doNothing().when(dataTree).validate(any(DataTreeModification.class));
+
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -1443,12 +1355,9 @@ public class ShardTest extends AbstractShardTest {
 
             // Send another can commit to ensure the failed one got cleaned up.
 
-            reset(cohort);
-
             final TransactionIdentifier transactionID2 = nextTransactionId();
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
+            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
@@ -1458,54 +1367,6 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
-    @Test
-    public void testCanCommitPhaseFalseResponse() throws Throwable {
-        testCanCommitPhaseFalseResponse(true);
-        testCanCommitPhaseFalseResponse(false);
-    }
-
-    private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
-        new ShardTestKit(getSystem()) {{
-            final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCanCommitPhaseFalseResponse-" + readWrite);
-
-            waitUntilLeader(shard);
-
-            final FiniteDuration duration = duration("5 seconds");
-
-            final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
-            doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
-
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.class);
-
-            // Send the CanCommitTransaction message.
-
-            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
-            CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.class));
-            assertEquals("getCanCommit", false, reply.getCanCommit());
-
-            // Send another can commit to ensure the failed one got cleaned up.
-
-            reset(cohort);
-
-            final TransactionIdentifier transactionID2 = nextTransactionId();
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.class);
-
-            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
-            reply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(CanCommitTransactionReply.class));
-            assertEquals("getCanCommit", true, reply.getCanCommit());
-        }};
-    }
-
     @Test
     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
         testImmediateCommitWithCanCommitPhaseFailure(true);
@@ -1514,137 +1375,81 @@ public class ShardTest extends AbstractShardTest {
 
     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
         new ShardTestKit(getSystem()) {{
+            final TipProducingDataTree dataTree = createDelegatingMockDataTree();
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                     "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
 
             waitUntilLeader(shard);
 
+            doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")).
+                 doNothing().when(dataTree).validate(any(DataTreeModification.class));
+
             final FiniteDuration duration = duration("5 seconds");
 
             final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
-            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
+            if(readWrite) {
+                shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            } else {
+                shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            }
 
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // Send another can commit to ensure the failed one got cleaned up.
 
-            reset(cohort);
-
             final TransactionIdentifier transactionID2 = nextTransactionId();
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-            doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
-            doReturn(Futures.immediateFuture(null)).when(cohort).commit();
-            final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
-            final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
-            doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
-            doReturn(candidateRoot).when(candidate).getRootNode();
-            doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
-            doReturn(candidate).when(cohort).getCandidate();
-
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
+            if(readWrite) {
+                shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            } else {
+                shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            }
 
             expectMsgClass(duration, CommitTransactionReply.class);
         }};
     }
 
+    @SuppressWarnings("serial")
     @Test
-    public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
-        testImmediateCommitWithCanCommitPhaseFalseResponse(true);
-        testImmediateCommitWithCanCommitPhaseFalseResponse(false);
-    }
-
-    private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
+    public void testAbortWithCommitPending() throws Throwable {
         new ShardTestKit(getSystem()) {{
-            final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
-
-            waitUntilLeader(shard);
-
-            final FiniteDuration duration = duration("5 seconds");
-
-            final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
-            doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
-
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
-
-            expectMsgClass(duration, akka.actor.Status.Failure.class);
-
-            // Send another can commit to ensure the failed one got cleaned up.
-
-            reset(cohort);
-
-            final TransactionIdentifier transactionID2 = nextTransactionId();
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-            doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
-            doReturn(Futures.immediateFuture(null)).when(cohort).commit();
-            final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
-            final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
-            doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
-            doReturn(candidateRoot).when(candidate).getRootNode();
-            doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
-            doReturn(candidate).when(cohort).getCandidate();
-
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
-
-            expectMsgClass(duration, CommitTransactionReply.class);
-        }};
-    }
+            final Creator<Shard> creator = new Creator<Shard>() {
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(newShardBuilder()) {
+                        @Override
+                        void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
+                            // Simulate an AbortTransaction message occurring during replication, after
+                            // persisting and before finishing the commit to the in-memory store.
 
-    @Test
-    public void testAbortBeforeFinishCommit() throws Throwable {
-        testAbortBeforeFinishCommit(true);
-        testAbortBeforeFinishCommit(false);
-    }
+                            doAbortTransaction(transactionId, null);
+                            super.persistPayload(transactionId, payload);
+                        }
+                    };
+                }
+            };
 
-    private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
-        new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortBeforeFinishCommit-" + readWrite);
+                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testAbortWithCommitPending");
 
             waitUntilLeader(shard);
 
             final FiniteDuration duration = duration("5 seconds");
-            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             final TransactionIdentifier transactionID = nextTransactionId();
-            final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
-                          cohort -> {
-                final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
-
-                // Simulate an AbortTransaction message occurring during replication, after
-                // persisting and before finishing the commit to the in-memory store.
-                // We have no followers so due to optimizations in the RaftActor, it does not
-                // attempt replication and thus we can't send an AbortTransaction message b/c
-                // it would be processed too late after CommitTransaction completes. So we'll
-                // simulate an AbortTransaction message occurring during replication by calling
-                // the shard directly.
-                //
-                shard.underlyingActor().doAbortTransaction(transactionID, null);
-
-                return preCommitFuture;
-            };
-
-            final MutableCompositeModification modification = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
-                    modification, preCommit);
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+            shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-            final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                expectMsgClass(duration, CanCommitTransactionReply.class));
-            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
 
             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.class);
@@ -1660,55 +1465,33 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testTransactionCommitTimeout() throws Throwable {
-        testTransactionCommitTimeout(true);
-        testTransactionCommitTimeout(false);
-    }
-
-    private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
-
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testTransactionCommitTimeout-" + readWrite);
+                    "testTransactionCommitTimeout");
 
             waitUntilLeader(shard);
 
             final FiniteDuration duration = duration("5 seconds");
 
-            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-            // Create 1st Tx - will timeout
+            // Ready 2 Tx's - the first will timeout
 
             final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification1 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
-                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
-                    modification1);
-
-            // Create 2nd Tx
+            shard.tell(prepareBatchedModifications(transactionID1, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             final TransactionIdentifier transactionID2 = nextTransactionId();
-            final MutableCompositeModification modification2 = new MutableCompositeModification();
             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
-            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
-                    listNodePath,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
-                    modification2);
-
-            // Ready the Tx's
-
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.class);
-
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+            shard.tell(prepareBatchedModifications(transactionID2, listNodePath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
@@ -1736,71 +1519,73 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
-    @Test
-    public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
-        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
-
-        new ShardTestKit(getSystem()) {{
-            final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testTransactionCommitQueueCapacityExceeded");
-
-            waitUntilLeader(shard);
-
-            final FiniteDuration duration = duration("5 seconds");
-
-            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
-            final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification1 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
-            final TransactionIdentifier transactionID2 = nextTransactionId();
-            final MutableCompositeModification modification2 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
-                    TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
-                    modification2);
-
-            final TransactionIdentifier transactionID3 = nextTransactionId();
-            final MutableCompositeModification modification3 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
-
-            // Ready the Tx's
-
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.class);
-
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.class);
-
-            // The 3rd Tx should exceed queue capacity and fail.
-
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
-            expectMsgClass(duration, akka.actor.Status.Failure.class);
-
-            // canCommit 1st Tx.
-
-            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
-            expectMsgClass(duration, CanCommitTransactionReply.class);
-
-            // canCommit the 2nd Tx - it should get queued.
-
-            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
-
-            // canCommit the 3rd Tx - should exceed queue capacity and fail.
-
-            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
-            expectMsgClass(duration, akka.actor.Status.Failure.class);
-        }};
-    }
+//    @Test
+//    @Ignore
+//    public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
+//        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
+//
+//        new ShardTestKit(getSystem()) {{
+//            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+//                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+//                    "testTransactionCommitQueueCapacityExceeded");
+//
+//            waitUntilLeader(shard);
+//
+//            final FiniteDuration duration = duration("5 seconds");
+//
+//            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+//
+//            final TransactionIdentifier transactionID1 = nextTransactionId();
+//            final MutableCompositeModification modification1 = new MutableCompositeModification();
+//            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+//                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
+//                    modification1);
+//
+//            final TransactionIdentifier transactionID2 = nextTransactionId();
+//            final MutableCompositeModification modification2 = new MutableCompositeModification();
+//            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+//                    TestModel.OUTER_LIST_PATH,
+//                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
+//                    modification2);
+//
+//            final TransactionIdentifier transactionID3 = nextTransactionId();
+//            final MutableCompositeModification modification3 = new MutableCompositeModification();
+//            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+//                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
+//                    modification3);
+//
+//            // Ready the Tx's
+//
+//            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+//            expectMsgClass(duration, ReadyTransactionReply.class);
+//
+//            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+//            expectMsgClass(duration, ReadyTransactionReply.class);
+//
+//            // The 3rd Tx should exceed queue capacity and fail.
+//
+//            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
+//            expectMsgClass(duration, akka.actor.Status.Failure.class);
+//
+//            // canCommit 1st Tx.
+//
+//            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+//            expectMsgClass(duration, CanCommitTransactionReply.class);
+//
+//            // canCommit the 2nd Tx - it should get queued.
+//
+//            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+//
+//            // canCommit the 3rd Tx - should exceed queue capacity and fail.
+//
+//            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+//            expectMsgClass(duration, akka.actor.Status.Failure.class);
+//        }};
+//    }
 
     @Test
     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
-        dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
-
+        dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -1810,30 +1595,19 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
             final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification1 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final TransactionIdentifier transactionID2 = nextTransactionId();
-            final MutableCompositeModification modification2 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
-
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             final TransactionIdentifier transactionID3 = nextTransactionId();
-            final MutableCompositeModification modification3 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
-                    TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
-
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
+            shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
@@ -1846,8 +1620,7 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
-        dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
-
+        dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -1860,14 +1633,11 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification1 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+            shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            // CanCommit the first one so it's the current in-progress CohortEntry.
+            // CanCommit the first Tx so it's the current in-progress Tx.
 
             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, CanCommitTransactionReply.class);
@@ -1875,11 +1645,8 @@ public class ShardTest extends AbstractShardTest {
             // Ready the second Tx.
 
             final TransactionIdentifier transactionID2 = nextTransactionId();
-            final MutableCompositeModification modification2 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
-
-            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+            shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Ready the third Tx.
@@ -1888,9 +1655,8 @@ public class ShardTest extends AbstractShardTest {
             final DataTreeModification modification3 = dataStore.newModification();
             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
                     .apply(modification3);
-                modification3.ready();
+            modification3.ready();
             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
-
             shard.tell(readyMessage, getRef());
 
             // Commit the first Tx. After completing, the second should expire from the queue and the third
@@ -1921,45 +1687,33 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testAbortCurrentTransaction() throws Throwable {
-        testAbortCurrentTransaction(true);
-        testAbortCurrentTransaction(false);
-    }
-
-    private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
+    public void testAbortAfterCanCommit() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortCurrentTransaction-" + readWrite);
+                    "testAbortAfterCanCommit");
 
             waitUntilLeader(shard);
 
-            // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
-
-            final TransactionIdentifier transactionID1 = nextTransactionId();
-            final MutableCompositeModification modification1 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
-            doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
-
-            final TransactionIdentifier transactionID2 = nextTransactionId();
-            final MutableCompositeModification modification2 = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
             final FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+            // Ready 2 transactions - the first one will be aborted.
+
+            final TransactionIdentifier transactionID1 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+            final TransactionIdentifier transactionID2 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
-            final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
                     expectMsgClass(duration, CanCommitTransactionReply.class));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
@@ -1977,78 +1731,101 @@ public class ShardTest extends AbstractShardTest {
 
             // Wait for the 2nd Tx to complete the canCommit phase.
 
-            Await.ready(canCommitFuture, duration);
-
-            final InOrder inOrder = inOrder(cohort1, cohort2);
-            inOrder.verify(cohort1).canCommit();
-            inOrder.verify(cohort2).canCommit();
+            canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
         }};
     }
 
     @Test
-    public void testAbortQueuedTransaction() throws Throwable {
-        testAbortQueuedTransaction(true);
-        testAbortQueuedTransaction(false);
-    }
-
-    private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
+    public void testAbortAfterReady() throws Throwable {
         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
         new ShardTestKit(getSystem()) {{
-            final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
-            @SuppressWarnings("serial")
-            final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
-                @Override
-                public void handleCommand(final Object message) {
-                    super.handleCommand(message);
-                    if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
-                        if(cleaupCheckLatch.get() != null) {
-                            cleaupCheckLatch.get().countDown();
-                        }
-                    }
-                }
-            };
-
             final TestActorRef<Shard> shard = actorFactory.createTestActor(
-                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(
-                            Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
 
             waitUntilLeader(shard);
 
-            final TransactionIdentifier transactionID = nextTransactionId();
-            final MutableCompositeModification modification = new MutableCompositeModification();
-            final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
-            doReturn(Futures.immediateFuture(null)).when(cohort).abort();
-
             final FiniteDuration duration = duration("5 seconds");
 
-            // Ready the tx.
+            // Ready a tx.
 
-            shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+            final TransactionIdentifier transactionID1 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
-
             // Send the AbortTransaction message.
 
-            shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+            shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             expectMsgClass(duration, AbortTransactionReply.class);
 
-            verify(cohort).abort();
-
-            // Verify the tx cohort is removed from queue at the cleanup check interval.
-
-            cleaupCheckLatch.set(new CountDownLatch(1));
-            assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
-                    cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
-
             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
 
             // Now send CanCommitTransaction - should fail.
 
-            shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
             assertTrue("Failure type", failure instanceof IllegalStateException);
+
+            // Ready and CanCommit another and verify success.
+
+            final TransactionIdentifier transactionID2 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
+        }};
+    }
+
+    @Test
+    public void testAbortQueuedTransaction() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            // Ready 3 tx's.
+
+            final TransactionIdentifier transactionID1 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            final TransactionIdentifier transactionID2 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            final TransactionIdentifier transactionID3 = nextTransactionId();
+            shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // Abort the second tx while it's queued.
+
+            shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.class);
+
+            // Commit the other 2.
+
+            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
+
+            shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
+
+            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.class);
+
+            shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.class);
+
+            assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
         }};
     }
 
@@ -2084,7 +1861,7 @@ public class ShardTest extends AbstractShardTest {
         new ShardTestKit(getSystem()) {{
             class TestShard extends Shard {
 
-                protected TestShard(AbstractBuilder<?, ?> builder) {
+                protected TestShard(final AbstractBuilder<?, ?> builder) {
                     super(builder);
                     setPersistence(new TestPersistentDataProvider(super.persistence()));
                 }
@@ -2125,7 +1902,7 @@ public class ShardTest extends AbstractShardTest {
             awaitAndValidateSnapshot(expectedRoot);
         }
 
-        private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
+        private void awaitAndValidateSnapshot(final NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
@@ -2371,8 +2148,6 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilNoLeader(shard);
 
-            final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                     RegisterDataTreeChangeListenerReply.class);
index de832c0..dae71b9 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.Props;
 import akka.testkit.TestActorRef;
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
@@ -39,7 +40,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     private static final TransactionType RW = TransactionType.READ_WRITE;
     private static final TransactionType WO = TransactionType.WRITE_ONLY;
 
-    private static final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
+    private static final Shard mockShard = Mockito.mock(Shard.class);
+
+    private static final ShardDataTree store = new ShardDataTree(mockShard, testSchemaContext, TreeType.OPERATIONAL);
 
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.create("inventory", MemberName.forName("member-1"), "operational");
index 55060d1..917c374 100644 (file)
@@ -17,17 +17,16 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.Status.Failure;
 import akka.actor.Terminated;
+import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import java.util.concurrent.TimeUnit;
-import org.junit.Ignore;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -41,19 +40,17 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class ShardTransactionTest extends AbstractActorTest {
 
-    private static final SchemaContext testSchemaContext = TestModel.createTestContext();
     private static final TransactionType RO = TransactionType.READ_ONLY;
     private static final TransactionType RW = TransactionType.READ_WRITE;
     private static final TransactionType WO = TransactionType.WRITE_ONLY;
@@ -61,28 +58,24 @@ public class ShardTransactionTest extends AbstractActorTest {
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
 
+    private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
 
-    private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
 
-    private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
+    private TestActorRef<Shard> shard;
+    private ShardDataTree store;
 
-    private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
-
-    private ActorRef createShard() {
-        ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
-                schemaContext(TestModel.createTestContext()).props());
+    @Before
+    public void setUp() {
+        shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
+                schemaContext(TestModel.createTestContext()).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
         ShardTestKit.waitUntilLeader(shard);
-        return shard;
-    }
-
-    private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
-        return newTransactionActor(type, transaction, null, name);
+        store = shard.underlyingActor().getDataStore();
     }
 
-    private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
-        Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
-                datastoreContext, shardStats);
-        return getSystem().actorOf(props, name);
+    private ActorRef newTransactionActor(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction, final String name) {
+        Props props = ShardTransaction.props(type, transaction, shard, datastoreContext, shard.underlyingActor().getShardMBean());
+        return actorFactory.createActor(props, name);
     }
 
     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
@@ -96,11 +89,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
+            testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
 
-            testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
-
-            testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
+            testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
         }
 
         private void testOnReceiveReadData(final ActorRef transaction) {
@@ -116,13 +107,11 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-
             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
-                    RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
+                    RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO"));
 
             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
-                    RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
+                    RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW"));
         }
 
         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
@@ -137,12 +126,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDataExistsPositive() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-
-            testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
+            testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(),
                     "testDataExistsPositiveRO"));
 
-            testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
+            testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(),
                     "testDataExistsPositiveRW"));
         }
 
@@ -159,12 +146,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDataExistsNegative() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-
-            testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
+            testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(),
                     "testDataExistsNegativeRO"));
 
-            testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
+            testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(),
                     "testDataExistsNegativeRW"));
         }
 
@@ -384,19 +369,6 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
-    // Unknown operations are being logged
-    @Ignore
-    @Test(expected=UnknownMessageException.class)
-    public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
-        final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
-                datastoreContext, shardStats);
-        final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
-
-        transaction.receive(new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION),
-                ActorRef.noSender());
-    }
-
     @Test
     public void testShardTransactionInactivity() {
 
index 657d7b3..1830290 100644 (file)
@@ -7,29 +7,31 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.Collections;
+import java.util.Optional;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import scala.concurrent.Promise;
 
 /**
  * Unit tests for SimpleShardDataTreeCohort.
@@ -37,106 +39,198 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree
  * @author Thomas Pantelis
  */
 public class SimpleShardDataTreeCohortTest extends AbstractTest {
-    @Mock
-    private TipProducingDataTree mockDataTree;
-
     @Mock
     private ShardDataTree mockShardDataTree;
 
     @Mock
     private DataTreeModification mockModification;
 
+    @Mock
+    private CompositeDataTreeCohort mockUserCohorts;
+
+    @Mock
+    private FutureCallback<DataTreeCandidate> mockPreCallback;
+
     private SimpleShardDataTreeCohort cohort;
 
     @Before
-    public void setup() {
+    public void setup() throws Exception {
         MockitoAnnotations.initMocks(this);
 
-        doReturn(mockDataTree).when(mockShardDataTree).getDataTree();
+        doNothing().when(mockUserCohorts).commit();
+        doReturn(Optional.empty()).when(mockUserCohorts).abort();
 
-        cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId());
+        cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(),
+            mockUserCohorts);
     }
 
     @Test
     public void testCanCommitSuccess() throws Exception {
-        ListenableFuture<Boolean> future = cohort.canCommit();
-        assertNotNull("Future is null", future);
-        assertEquals("Future", true, future.get(3, TimeUnit.SECONDS));
-        verify(mockDataTree).validate(mockModification);
+        canCommitSuccess();
     }
 
-    @Test(expected=OptimisticLockFailedException.class)
-    public void testCanCommitWithConflictingModEx() throws Throwable {
-        doThrow(new ConflictingModificationAppliedException(YangInstanceIdentifier.EMPTY, "mock")).
-                when(mockDataTree).validate(mockModification);
-        try {
-            cohort.canCommit().get();
-        } catch (ExecutionException e) {
-            throw e.getCause();
-        }
+    private void canCommitSuccess() {
+        doAnswer(invocation -> {
+            invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCanCommit();
+            return null;
+        }).when(mockShardDataTree).startCanCommit(cohort);
+
+        @SuppressWarnings("unchecked")
+        final FutureCallback<Void> callback = mock(FutureCallback.class);
+        cohort.canCommit(callback);
+
+        verify(callback).onSuccess(null);
+        verifyNoMoreInteractions(callback);
     }
 
-    @Test(expected=TransactionCommitFailedException.class)
-    public void testCanCommitWithDataValidationEx() throws Throwable {
-        doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock")).
-                when(mockDataTree).validate(mockModification);
-        try {
-            cohort.canCommit().get();
-        } catch (ExecutionException e) {
-            throw e.getCause();
-        }
+    private void testValidatationPropagates(final Exception cause) throws DataValidationFailedException {
+        doAnswer(invocation -> {
+            invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).failedCanCommit(cause);
+            return null;
+        }).when(mockShardDataTree).startCanCommit(cohort);
+
+        @SuppressWarnings("unchecked")
+        final FutureCallback<Void> callback = mock(FutureCallback.class);
+        cohort.canCommit(callback);
+
+        verify(callback).onFailure(cause);
+        verifyNoMoreInteractions(callback);
     }
 
-    @Test(expected=IllegalArgumentException.class)
-    public void testCanCommitWithIllegalArgumentEx() throws Throwable {
-        doThrow(new IllegalArgumentException("mock")).when(mockDataTree).validate(mockModification);
-        try {
-            cohort.canCommit().get();
-        } catch (ExecutionException e) {
-            throw e.getCause();
-        }
+    @Test
+    public void testCanCommitWithConflictingModEx() throws DataValidationFailedException {
+        testValidatationPropagates(new ConflictingModificationAppliedException(YangInstanceIdentifier.EMPTY, "mock"));
     }
 
     @Test
-    public void testPreCommitAndCommitSuccess() throws Exception {
-        DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class);
-        doReturn(mockCandidate ).when(mockDataTree).prepare(mockModification);
+    public void testCanCommitWithDataValidationEx() throws DataValidationFailedException {
+        testValidatationPropagates(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock"));
+    }
 
-        ListenableFuture<Void> future = cohort.preCommit();
-        assertNotNull("Future is null", future);
-        future.get();
-        verify(mockDataTree).prepare(mockModification);
+    @Test
+    public void testCanCommitWithIllegalArgumentEx() throws DataValidationFailedException {
+        testValidatationPropagates(new IllegalArgumentException("mock"));
+    }
+
+    private DataTreeCandidateTip preCommitSuccess() {
+        final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class);
+        doAnswer(invocation -> {
+            invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulPreCommit(mockCandidate);
+            return null;
+        }).when(mockShardDataTree).startPreCommit(cohort);
+
+        @SuppressWarnings("unchecked")
+        final FutureCallback<DataTreeCandidate> callback = mock(FutureCallback.class);
+        cohort.preCommit(callback);
+
+        verify(callback).onSuccess(mockCandidate);
+        verifyNoMoreInteractions(callback);
 
         assertSame("getCandidate", mockCandidate, cohort.getCandidate());
 
-        future = cohort.commit();
-        assertNotNull("Future is null", future);
-        future.get();
-        verify(mockDataTree).commit(mockCandidate);
+        return mockCandidate;
+    }
+
+    @Test
+    public void testPreCommitAndCommitSuccess() throws Exception {
+        canCommitSuccess();
+        final DataTreeCandidateTip candidate = preCommitSuccess();
+
+        doAnswer(invocation -> {
+            invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCommit(UnsignedLong.valueOf(0));
+            return null;
+        }).when(mockShardDataTree).startCommit(cohort, candidate);
+
+        @SuppressWarnings("unchecked")
+        final
+        FutureCallback<UnsignedLong> mockCommitCallback = mock(FutureCallback.class);
+        cohort.commit(mockCommitCallback);
+
+        verify(mockCommitCallback).onSuccess(any(UnsignedLong.class));
+        verifyNoMoreInteractions(mockCommitCallback);
+
+        verify(mockUserCohorts).commit();
     }
 
-    @Test(expected=IllegalArgumentException.class)
+    @Test
     public void testPreCommitWithIllegalArgumentEx() throws Throwable {
-        doThrow(new IllegalArgumentException("mock")).when(mockDataTree).prepare(mockModification);
-        try {
-            cohort.preCommit().get();
-        } catch (ExecutionException e) {
-            throw e.getCause();
-        }
+        canCommitSuccess();
+
+        final Exception cause = new IllegalArgumentException("mock");
+        doAnswer(invocation -> {
+            invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).failedPreCommit(cause);
+            return null;
+        }).when(mockShardDataTree).startPreCommit(cohort);
+
+        @SuppressWarnings("unchecked")
+        final FutureCallback<DataTreeCandidate> callback = mock(FutureCallback.class);
+        cohort.preCommit(callback);
+
+        verify(callback).onFailure(cause);
+        verifyNoMoreInteractions(callback);
+
+        verify(mockUserCohorts).abort();
     }
 
-    @Test(expected=IllegalArgumentException.class)
-    public void testCommitWithIllegalArgumentEx() throws Throwable {
-        doThrow(new IllegalArgumentException("mock")).when(mockDataTree).commit(any(DataTreeCandidateTip.class));
-        try {
-            cohort.commit().get();
-        } catch (ExecutionException e) {
-            throw e.getCause();
-        }
+    @Test
+    public void testPreCommitWithReportedFailure() throws Throwable {
+        canCommitSuccess();
+
+        final Exception cause = new IllegalArgumentException("mock");
+        cohort.reportFailure(cause);
+
+        @SuppressWarnings("unchecked")
+        final FutureCallback<DataTreeCandidate> callback = mock(FutureCallback.class);
+        cohort.preCommit(callback);
+
+        verify(callback).onFailure(cause);
+        verifyNoMoreInteractions(callback);
+
+        verify(mockShardDataTree, never()).startPreCommit(cohort);
+    }
+
+    @Test
+    public void testCommitWithIllegalArgumentEx() {
+        canCommitSuccess();
+        final DataTreeCandidateTip candidate = preCommitSuccess();
+
+        final Exception cause = new IllegalArgumentException("mock");
+        doAnswer(invocation -> {
+            invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).failedCommit(cause);
+            return null;
+        }).when(mockShardDataTree).startCommit(cohort, candidate);
+
+        @SuppressWarnings("unchecked")
+        final FutureCallback<UnsignedLong> callback = mock(FutureCallback.class);
+        cohort.commit(callback);
+
+        verify(callback).onFailure(cause);
+        verifyNoMoreInteractions(callback);
+
+        verify(mockUserCohorts).abort();
     }
 
     @Test
     public void testAbort() throws Exception {
+        doNothing().when(mockShardDataTree).startAbort(cohort);
+
         cohort.abort().get();
+
+        verify(mockShardDataTree).startAbort(cohort);
+    }
+
+    @Test
+    public void testAbortWithCohorts() throws Exception {
+        doNothing().when(mockShardDataTree).startAbort(cohort);
+
+        final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
+        doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();
+
+        final ListenableFuture<Void> abortFuture = cohort.abort();
+
+        cohortFuture.success(Collections.emptyList());
+
+        abortFuture.get();
+        verify(mockShardDataTree).startAbort(cohort);
     }
 }
index 33fb4e3..66bd048 100644 (file)
@@ -15,8 +15,12 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import akka.testkit.JavaTestKit;
 import com.google.common.collect.ImmutableSet;
 import java.util.concurrent.TimeUnit;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
@@ -40,8 +44,16 @@ public class CandidateListChangeListenerTest extends AbstractActorTest {
     private static final YangInstanceIdentifier ENTITY_ID2 =
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
 
-    private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(),
-        TreeType.OPERATIONAL);
+    private ShardDataTree shardDataTree;
+
+    @Mock
+    private Shard mockShard;
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+        shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(), TreeType.OPERATIONAL);
+    }
 
     @Test
     public void testOnDataTreeChanged() throws Exception {
@@ -95,11 +107,11 @@ public class CandidateListChangeListenerTest extends AbstractActorTest {
                 ImmutableSet.copyOf(candidateRemoved.getRemainingCandidates()));
     }
 
-    private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
+    private void writeNode(final YangInstanceIdentifier path, final NormalizedNode<?, ?> node) throws DataValidationFailedException {
         AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
     }
 
-    private void deleteNode(YangInstanceIdentifier path) throws DataValidationFailedException {
+    private void deleteNode(final YangInstanceIdentifier path) throws DataValidationFailedException {
         AbstractEntityOwnershipTest.deleteNode(path, shardDataTree);
     }
 }
index 6d092cf..52adc5a 100644 (file)
@@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
@@ -220,7 +221,9 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh
         DistributedEntityOwnershipService service = spy(DistributedEntityOwnershipService.start(
             dataStore.getActorContext(), EntityOwnerSelectionStrategyConfig.newBuilder().build()));
 
-        ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(), TreeType.OPERATIONAL);
+        final Shard mockShard = Mockito.mock(Shard.class);
+        ShardDataTree shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(),
+            TreeType.OPERATIONAL);
 
         when(service.getLocalEntityOwnershipShardDataTree()).thenReturn(shardDataTree.getDataTree());
 
index 3c4fea0..17c6d11 100644 (file)
@@ -19,7 +19,9 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
@@ -47,7 +49,9 @@ public class EntityOwnerChangeListenerTest {
     private static final DOMEntity ENTITY1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
     private static final DOMEntity ENTITY2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
 
-    private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(),
+    private final Shard mockShard = Mockito.mock(Shard.class);
+
+    private final ShardDataTree shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(),
         TreeType.OPERATIONAL);
     private final EntityOwnershipListenerSupport mockListenerSupport = mock(EntityOwnershipListenerSupport.class);
     private EntityOwnerChangeListener listener;
@@ -133,7 +137,7 @@ public class EntityOwnerChangeListenerTest {
                 anyBoolean(), anyBoolean());
     }
 
-    private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
+    private void writeNode(final YangInstanceIdentifier path, final NormalizedNode<?, ?> node) throws DataValidationFailedException {
         AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
     }
 }
index d3c3d1d..af34790 100644 (file)
@@ -592,7 +592,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
                 ImmutableMap.<String, String>builder().put(localId.toString(), shard.path().toString()).build(),
-                    LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
+                    leaderId.getMemberName().getName(), EntityOwnerSelectionStrategyConfig.newBuilder().build())
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
         leader.tell(TimeoutNow.INSTANCE, leader);
 
         ShardTestKit.waitUntilLeader(leader);
@@ -833,7 +834,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
                 DatastoreContext datastoreContext) {
             super(newBuilder().id(name).peerAddresses(peerAddresses).datastoreContext(datastoreContext).
-                    schemaContext(SCHEMA_CONTEXT).localMemberName(MemberName.forName(LOCAL_MEMBER_NAME)));
+                    schemaContext(SCHEMA_CONTEXT).localMemberName(name.getMemberName()));
         }
 
         @Override
index a621ac6..320e830 100644 (file)
@@ -16,7 +16,9 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -35,7 +37,9 @@ public class EntityOwnershipStatisticsTest extends AbstractActorTest {
     private static final YangInstanceIdentifier ENTITY_ID2 =
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
 
-    private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(),
+    private final Shard mockShard = Mockito.mock(Shard.class);
+
+    private final ShardDataTree shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(),
         TreeType.OPERATIONAL);
     private EntityOwnershipStatistics ownershipStatistics;
 
@@ -132,11 +136,11 @@ public class EntityOwnershipStatisticsTest extends AbstractActorTest {
 
     }
 
-    private static void assertStatistics(Map<String, Map<String, Long>> statistics, String memberName, long val) {
+    private static void assertStatistics(final Map<String, Map<String, Long>> statistics, final String memberName, final long val) {
         assertEquals(val, statistics.get(ENTITY_TYPE).get(memberName).longValue());
     }
 
-    private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
+    private void writeNode(final YangInstanceIdentifier path, final NormalizedNode<?, ?> node) throws DataValidationFailedException {
         AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
     }
 }
\ No newline at end of file
index 7bf7d50..c078c94 100644 (file)
@@ -30,7 +30,9 @@ import java.lang.reflect.Method;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
@@ -80,7 +82,7 @@ public class PruningDataTreeModificationTest {
         realModification = dataTree.takeSnapshot().newModification();
         proxyModification = Reflection.newProxy(DataTreeModification.class, new InvocationHandler() {
             @Override
-            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+            public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
                 try {
                     method.invoke(mockModification, args);
                     return method.invoke(realModification, args);
@@ -201,8 +203,10 @@ public class PruningDataTreeModificationTest {
     }
 
     @Test
-    public void testWriteRootNodeWithInvalidChild() throws Exception{
-        ShardDataTree shardDataTree = new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION);
+    public void testWriteRootNodeWithInvalidChild() throws Exception {
+        final Shard mockShard = Mockito.mock(Shard.class);
+
+        ShardDataTree shardDataTree = new ShardDataTree(mockShard, SCHEMA_CONTEXT, TreeType.CONFIGURATION);
         NormalizedNode<?, ?> root = shardDataTree.readNode(YangInstanceIdentifier.EMPTY).get();
 
         NormalizedNode<?, ?> normalizedNode = ImmutableContainerNodeBuilder.create().withNodeIdentifier(

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.