Elide front-end 3PC for single-shard Tx 92/18392/7
authorTom Pantelis <tpanteli@brocade.com>
Fri, 17 Apr 2015 22:53:21 +0000 (18:53 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Sat, 18 Apr 2015 07:22:36 +0000 (03:22 -0400)
A new method, directCommit, was added to TransactionContext.
In TransactionContextImpl#directCommit, it sends the final ready
BatchedModifications message as before but with a new flag,
doCommitOnReady, set to true. In ShardCommitCoordinator, if
doCommitOnReady is true, it immediately initiates the canCommit
phase without waiting for the CanCommitTransaction message.

In ShardWriteTransaction, if doCommitOnReady is true, it sets a new
similar flag in the ForwardedReadyTransaction message. Similarly,
the ShardCommitCoordinator immediately initiates the canCommit phase.
This code path is executed for read-write transactions as they still
utilize the transaction actor.

In TransactionProxy, when only 1 shard was accessed, it creates a
SingleCommitCohortProxy instance. This class also implements
DOMStoreThreePhaseCommitCohort but the 3 phases are essentially a
no-op with direct commit. The canCommit phase simply waits for the direct
commit Future to complete and returns success (true). If a failure occurs
from the Shard, the exception is propagated to the caller.

For backwards compatibility, we still need the
ThreePhaseCommitCohortProxy even with a single-shard transaction. A
complication with this is that TransactionProxy#ready may not know
immediately if it can do direct commit or not because it may not have the
TransactionContext instance yet due to the async nature of things.
So in either case it still creates a SingleCommitCohortProxy. When it gets
the callback to finally complete the operation, it checks the
TransactionContext to see if it supports direct commit (only
pre-Lithium versions won't). If supported, it calls directCommit, otherwise
readyTransaction. In the SingleCommitCohortProxy, on successful
completion of the ready/direct commit Future, it checks the response
type. If it's an ActorSelection then it needs to do 3-phase commit
so it creates and delegates to a ThreePhaseCommitCohortProxy.

I moved the code in Shard#handleCommitTransaction to the
ShardCommitCoordinator as this is also needed for direct commit. I
also moved the handleForwardedReadyTransaction code into
ShardCommitCoordinator to make it easier to handle direct commit.

Change-Id: I40b04dd5abd24c78709598a5acfc16484e165427
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
26 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCallback.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java

index cac0f5135463f0772fdaf34c4f25297c22e8cf70..7c56f261edb64b55190873d2e9922bace0fe5809 100644 (file)
@@ -7,7 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorSelection;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import scala.concurrent.Future;
 import java.util.List;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import scala.concurrent.Future;
@@ -17,6 +18,9 @@ import scala.concurrent.Future;
  * implementation. In addition to the usual set of methods it also contains the list of actor
  * futures.
  */
  * implementation. In addition to the usual set of methods it also contains the list of actor
  * futures.
  */
-abstract class AbstractThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
-    abstract List<Future<ActorSelection>> getCohortFutures();
+public abstract class AbstractThreePhaseCommitCohort<T> implements DOMStoreThreePhaseCommitCohort {
+    protected static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null);
+    protected static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS = Futures.immediateFuture(Boolean.TRUE);
+
+    abstract List<Future<T>> getCohortFutures();
 }
 }
index ed3aa85c1fc5cd56b1a8f05f88bf871a995b9804..f1f33bfc9340d3ef7ab73711b6b69b176eccd1ba 100644 (file)
@@ -22,20 +22,20 @@ final class ChainedTransactionProxy extends TransactionProxy {
     /**
      * Stores the ready Futures from the previous Tx in the chain.
      */
     /**
      * Stores the ready Futures from the previous Tx in the chain.
      */
-    private final List<Future<ActorSelection>> previousReadyFutures;
+    private final List<Future<Object>> previousReadyFutures;
 
     /**
      * Stores the ready Futures from this transaction when it is readied.
      */
 
     /**
      * Stores the ready Futures from this transaction when it is readied.
      */
-    private volatile List<Future<ActorSelection>> readyFutures;
+    private volatile List<Future<Object>> readyFutures;
 
     ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
 
     ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
+            String transactionChainId, List<Future<Object>> previousReadyFutures) {
         super(actorContext, transactionType, transactionChainId);
         this.previousReadyFutures = previousReadyFutures;
     }
 
         super(actorContext, transactionType, transactionChainId);
         this.previousReadyFutures = previousReadyFutures;
     }
 
-    List<Future<ActorSelection>> getReadyFutures() {
+    List<Future<Object>> getReadyFutures() {
         return readyFutures;
     }
 
         return readyFutures;
     }
 
@@ -43,10 +43,11 @@ final class ChainedTransactionProxy extends TransactionProxy {
         return readyFutures != null;
     }
 
         return readyFutures != null;
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     @Override
-    public AbstractThreePhaseCommitCohort ready() {
-        final AbstractThreePhaseCommitCohort ret = super.ready();
-        readyFutures = ret.getCohortFutures();
+    public AbstractThreePhaseCommitCohort<?> ready() {
+        final AbstractThreePhaseCommitCohort<?> ret = super.ready();
+        readyFutures = (List)ret.getCohortFutures();
         LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
             readyFutures.size(), getTransactionChainId());
         return ret;
         LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
             readyFutures.size(), getTransactionChainId());
         return ret;
@@ -70,14 +71,14 @@ final class ChainedTransactionProxy extends TransactionProxy {
         }
 
         // Combine the ready Futures into 1.
         }
 
         // Combine the ready Futures into 1.
-        Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
+        Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
                 previousReadyFutures, getActorContext().getClientDispatcher());
 
         // Add a callback for completion of the combined Futures.
         final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
                 previousReadyFutures, getActorContext().getClientDispatcher());
 
         // Add a callback for completion of the combined Futures.
         final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
-        OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
+        OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
             @Override
             @Override
-            public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
+            public void onComplete(Throwable failure, Iterable<Object> notUsed) {
                 if(failure != null) {
                     // A Ready Future failed so fail the returned Promise.
                     returnPromise.failure(failure);
                 if(failure != null) {
                     // A Ready Future failed so fail the returned Promise.
                     returnPromise.failure(failure);
index c3cc4998656e82b37354500a1b629534c5622abe..8ae79ceb2dae0c819d8e964f5ab6af8133a25e2f 100644 (file)
@@ -63,7 +63,7 @@ public class DatastoreContext {
     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
-    private boolean writeOnlyTransactionOptimizationsEnabled = false;
+    private boolean writeOnlyTransactionOptimizationsEnabled = true;
 
     public static Set<String> getGlobalDatastoreTypes() {
         return globalDatastoreTypes;
 
     public static Set<String> getGlobalDatastoreTypes() {
         return globalDatastoreTypes;
index 376b6580464cd08f8942d2b800d5fccb69b333f0..1f5f5bcf79351764eb7776634a950f59077f63dc 100644 (file)
@@ -7,8 +7,6 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorSelection;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
 import java.util.List;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
 import java.util.List;
@@ -18,12 +16,9 @@ import scala.concurrent.Future;
  * A {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}
  * instance given out for empty transactions.
  */
  * A {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}
  * instance given out for empty transactions.
  */
-final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort {
+final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort<Object> {
     static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
 
     static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
 
-    private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null);
-    private static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS = Futures.immediateFuture(Boolean.TRUE);
-
     private NoOpDOMStoreThreePhaseCommitCohort() {
         // Hidden to prevent instantiation
     }
     private NoOpDOMStoreThreePhaseCommitCohort() {
         // Hidden to prevent instantiation
     }
@@ -49,7 +44,7 @@ final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitC
     }
 
     @Override
     }
 
     @Override
-    List<Future<ActorSelection>> getCohortFutures() {
+    List<Future<Object>> getCohortFutures() {
         return Collections.emptyList();
     }
 }
         return Collections.emptyList();
     }
 }
index 672560bbdd5c6b01a149e60dae7ae00d3d309f2f..197cd9fa83c00b569b25d59d982328774fb44436 100644 (file)
@@ -36,9 +36,21 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
         LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier());
     }
 
         LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier());
     }
 
+    @Override
+    public boolean supportsDirectCommit() {
+        return true;
+    }
+
+    @Override
+    public Future<Object> directCommit() {
+        LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
+        operationLimiter.release();
+        return akka.dispatch.Futures.failed(failure);
+    }
+
     @Override
     public Future<ActorSelection> readyTransaction() {
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called", getIdentifier());
+        LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
         operationLimiter.release();
         return akka.dispatch.Futures.failed(failure);
     }
         operationLimiter.release();
         return akka.dispatch.Futures.failed(failure);
     }
index b9445361bb9ff43521b124c5c20c6aa6b1c06358..592e6bc9c9f5ea0c1856dc2674aa5c0e81e79180 100644 (file)
@@ -8,7 +8,31 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
 
 package org.opendaylight.controller.cluster.datastore;
 
-public interface OperationCallback {
+import java.util.concurrent.atomic.AtomicReference;
+
+interface OperationCallback {
+    OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+        @Override
+        public void run() {
+        }
+
+        @Override
+        public void success() {
+        }
+
+        @Override
+        public void failure() {
+        }
+    };
+
+    class Reference extends AtomicReference<OperationCallback> {
+        private static final long serialVersionUID = 1L;
+
+        public Reference(OperationCallback initialValue) {
+            super(initialValue);
+        }
+    }
+
     void run();
     void success();
     void failure();
     void run();
     void success();
     void failure();
index 7110adc625ac52aa6b54717052a8f511beef89a7..91e072b076ef7c68116009c94127df859ef7540b 100644 (file)
@@ -30,7 +30,6 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
-import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
@@ -40,7 +39,6 @@ import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@ -49,7 +47,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -107,9 +104,6 @@ public class Shard extends RaftActor {
 
     private final MessageTracker appendEntriesReplyTracker;
 
 
     private final MessageTracker appendEntriesReplyTracker;
 
-    private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
-            Serialization.serializedActorPath(getSelf()));
-
     private final DOMTransactionFactory domTransactionFactory;
 
     private final ShardTransactionActorFactory transactionActorFactory;
     private final DOMTransactionFactory domTransactionFactory;
 
     private final ShardTransactionActorFactory transactionActorFactory;
@@ -240,7 +234,8 @@ public class Shard extends RaftActor {
             } else if (BatchedModifications.class.isInstance(message)) {
                 handleBatchedModifications((BatchedModifications)message);
             } else if (message instanceof ForwardedReadyTransaction) {
             } else if (BatchedModifications.class.isInstance(message)) {
                 handleBatchedModifications((BatchedModifications)message);
             } else if (message instanceof ForwardedReadyTransaction) {
-                handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+                commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
+                        getSender(), this);
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
             } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
             } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
@@ -310,54 +305,22 @@ public class Shard extends RaftActor {
         }
     }
 
         }
     }
 
-    private void handleCommitTransaction(final CommitTransaction commit) {
-        final String transactionID = commit.getTransactionID();
-
-        LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
-
-        // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
-        // this transaction.
-        final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
-        if(cohortEntry == null) {
-            // We're not the current Tx - the Tx was likely expired b/c it took too long in
-            // between the canCommit and commit messages.
-            IllegalStateException ex = new IllegalStateException(
-                    String.format("%s: Cannot commit transaction %s - it is not the current transaction",
-                            persistenceId(), transactionID));
-            LOG.error(ex.getMessage());
-            shardMBean.incrementFailedTransactionsCount();
-            getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
-            return;
+    void continueCommit(final CohortEntry cohortEntry) throws Exception {
+        // If we do not have any followers and we are not using persistence
+        // or if cohortEntry has no modifications
+        // we can apply modification to the state immediately
+        if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
+            applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification());
+        } else {
+            Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+                    new ModificationPayload(cohortEntry.getModification()));
         }
         }
+    }
 
 
-        // We perform the preCommit phase here atomically with the commit phase. This is an
-        // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
-        // coordination of preCommit across shards in case of failure but preCommit should not
-        // normally fail since we ensure only one concurrent 3-phase commit.
-
-        try {
-            // We block on the future here so we don't have to worry about possibly accessing our
-            // state on a different thread outside of our dispatcher. Also, the data store
-            // currently uses a same thread executor anyway.
-            cohortEntry.getCohort().preCommit().get();
-
-            // If we do not have any followers and we are not using persistence
-            // or if cohortEntry has no modifications
-            // we can apply modification to the state immediately
-            if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
-                applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
-            } else {
-                Shard.this.persistData(getSender(), transactionID,
-                        new ModificationPayload(cohortEntry.getModification()));
-            }
-        } catch (Exception e) {
-            LOG.error("{} An exception occurred while preCommitting transaction {}",
-                    persistenceId(), cohortEntry.getTransactionID(), e);
+    private void handleCommitTransaction(final CommitTransaction commit) {
+        if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
             shardMBean.incrementFailedTransactionsCount();
             shardMBean.incrementFailedTransactionsCount();
-            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
         }
-
-        cohortEntry.updateLastAccessTime();
     }
 
     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
     }
 
     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
@@ -415,7 +378,7 @@ public class Shard extends RaftActor {
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
-        commitCoordinator.handleCanCommit(canCommit, getSender(), self());
+        commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
     private void handleBatchedModifications(BatchedModifications batched) {
     }
 
     private void handleBatchedModifications(BatchedModifications batched) {
@@ -433,12 +396,7 @@ public class Shard extends RaftActor {
         //
         if(isLeader()) {
             try {
         //
         if(isLeader()) {
             try {
-                boolean ready = commitCoordinator.handleTransactionModifications(batched);
-                if(ready) {
-                    sender().tell(READY_TRANSACTION_REPLY, self());
-                } else {
-                    sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
-                }
+                commitCoordinator.handleBatchedModifications(batched, getSender(), this);
             } catch (Exception e) {
                 LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                         batched.getTransactionID(), e);
             } catch (Exception e) {
                 LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                         batched.getTransactionID(), e);
@@ -463,39 +421,6 @@ public class Shard extends RaftActor {
         }
     }
 
         }
     }
 
-    private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
-        LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
-                ready.getTransactionID(), ready.getTxnClientVersion());
-
-        // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
-        // commitCoordinator in preparation for the subsequent three phase commit initiated by
-        // the front-end.
-        commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
-                (MutableCompositeModification) ready.getModification());
-
-        // Return our actor path as we'll handle the three phase commit, except if the Tx client
-        // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
-        // node. In that case, the subsequent 3-phase commit messages won't contain the
-        // transactionId so to maintain backwards compatibility, we create a separate cohort actor
-        // to provide the compatible behavior.
-        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
-            ActorRef replyActorPath = getSelf();
-            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-                LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
-                replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
-                        ready.getTransactionID()));
-            }
-
-            ReadyTransactionReply readyTransactionReply =
-                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
-                            ready.getTxnClientVersion());
-            getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                readyTransactionReply, getSelf());
-        } else {
-            getSender().tell(READY_TRANSACTION_REPLY, getSelf());
-        }
-    }
-
     private void handleAbortTransaction(final AbortTransaction abort) {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }
     private void handleAbortTransaction(final AbortTransaction abort) {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }
index b96e38d76a45aced0e1c326c051cb7fcbd1d82d1..4ff9b5fd4353e5857ec6533c7a0cfc4ee6ec4ee2 100644 (file)
@@ -21,11 +21,15 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.slf4j.Logger;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.slf4j.Logger;
@@ -56,8 +60,6 @@ public class ShardCommitCoordinator {
 
     private final String name;
 
 
     private final String name;
 
-    private final String shardActorPath;
-
     private final RemovalListener<String, CohortEntry> cacheRemovalListener =
             new RemovalListener<String, CohortEntry>() {
                 @Override
     private final RemovalListener<String, CohortEntry> cacheRemovalListener =
             new RemovalListener<String, CohortEntry>() {
                 @Override
@@ -71,6 +73,8 @@ public class ShardCommitCoordinator {
     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     private CohortDecorator cohortDecorator;
 
     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     private CohortDecorator cohortDecorator;
 
+    private ReadyTransactionReply readyTransactionReply;
+
     public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
             long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
 
     public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
             long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
 
@@ -79,8 +83,6 @@ public class ShardCommitCoordinator {
         this.name = name;
         this.transactionFactory = transactionFactory;
 
         this.name = name;
         this.transactionFactory = transactionFactory;
 
-        shardActorPath = Serialization.serializedActorPath(shardActor);
-
         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
                 removalListener(cacheRemovalListener).build();
 
         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
                 removalListener(cacheRemovalListener).build();
 
@@ -93,18 +95,55 @@ public class ShardCommitCoordinator {
         this.queueCapacity = queueCapacity;
     }
 
         this.queueCapacity = queueCapacity;
     }
 
+    private ReadyTransactionReply readyTransactionReply(Shard shard) {
+        if(readyTransactionReply == null) {
+            readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
+        }
+
+        return readyTransactionReply;
+    }
+
     /**
      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
     /**
      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
-     *
-     * @param transactionID the ID of the transaction
-     * @param cohort the cohort to participate in the transaction commit
-     * @param modification the modifications made by the transaction
      */
      */
-    public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
-            MutableCompositeModification modification) {
+    public void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
+        log.debug("{}: Readying transaction {}, client version {}", name,
+                ready.getTransactionID(), ready.getTxnClientVersion());
+
+        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
+                (MutableCompositeModification) ready.getModification());
+        cohortCache.put(ready.getTransactionID(), cohortEntry);
+
+        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+            // Return our actor path as we'll handle the three phase commit except if the Tx client
+            // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
+            // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
+            // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
+            ActorRef replyActorPath = shard.self();
+            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+                log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
+                replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                        ready.getTransactionID()));
+            }
 
 
-        cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
+            ReadyTransactionReply readyTransactionReply =
+                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+                            ready.getTxnClientVersion());
+            sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+                readyTransactionReply, shard.self());
+        } else {
+            if(ready.isDoImmediateCommit()) {
+                cohortEntry.setDoImmediateCommit(true);
+                cohortEntry.setReplySender(sender);
+                cohortEntry.setShard(shard);
+                handleCanCommit(cohortEntry);
+            } else {
+                // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
+                // front-end so send back a ReadyTransactionReply with our actor path.
+                sender.tell(readyTransactionReply(shard), shard.self());
+            }
+        }
     }
 
     /**
     }
 
     /**
@@ -118,7 +157,7 @@ public class ShardCommitCoordinator {
      *
      * @throws ExecutionException if an error occurs loading the cache
      */
      *
      * @throws ExecutionException if an error occurs loading the cache
      */
-    public boolean handleTransactionModifications(BatchedModifications batched)
+    boolean handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
             throws ExecutionException {
         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
         if(cohortEntry == null) {
             throws ExecutionException {
         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
         if(cohortEntry == null) {
@@ -142,43 +181,30 @@ public class ShardCommitCoordinator {
                         batched.getTransactionID(), batched.getVersion());
             }
 
                         batched.getTransactionID(), batched.getVersion());
             }
 
-            cohortEntry.ready(cohortDecorator);
+            cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
+
+            if(batched.isDoCommitOnReady()) {
+                cohortEntry.setReplySender(sender);
+                cohortEntry.setShard(shard);
+                handleCanCommit(cohortEntry);
+            } else {
+                sender.tell(readyTransactionReply(shard), shard.self());
+            }
+        } else {
+            sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
         }
 
         return batched.isReady();
     }
 
         }
 
         return batched.isReady();
     }
 
-    /**
-     * This method handles the canCommit phase for a transaction.
-     *
-     * @param canCommit the CanCommitTransaction message
-     * @param sender the actor that sent the message
-     * @param shard the transaction's shard actor
-     */
-    public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
-            final ActorRef shard) {
-        String transactionID = canCommit.getTransactionID();
+    private void handleCanCommit(CohortEntry cohortEntry) {
+        String transactionID = cohortEntry.getTransactionID();
+
         if(log.isDebugEnabled()) {
             log.debug("{}: Processing canCommit for transaction {} for shard {}",
         if(log.isDebugEnabled()) {
             log.debug("{}: Processing canCommit for transaction {} for shard {}",
-                    name, transactionID, shard.path());
-        }
-
-        // Lookup the cohort entry that was cached previously (or should have been) by
-        // transactionReady (via the ForwardedReadyTransaction message).
-        final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
-        if(cohortEntry == null) {
-            // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
-            // between canCommit and ready and the entry was expired from the cache.
-            IllegalStateException ex = new IllegalStateException(
-                    String.format("%s: No cohort entry found for transaction %s", name, transactionID));
-            log.error(ex.getMessage());
-            sender.tell(new Status.Failure(ex), shard);
-            return;
+                    name, transactionID, cohortEntry.getShard().self().path());
         }
 
         }
 
-        cohortEntry.setCanCommitSender(sender);
-        cohortEntry.setShard(shard);
-
         if(currentCohortEntry != null) {
             // There's already a Tx commit in progress - attempt to queue this entry to be
             // committed after the current Tx completes.
         if(currentCohortEntry != null) {
             // There's already a Tx commit in progress - attempt to queue this entry to be
             // committed after the current Tx completes.
@@ -195,7 +221,7 @@ public class ShardCommitCoordinator {
                                       " capacity %d has been reached.",
                                       name, transactionID, queueCapacity));
                 log.error(ex.getMessage());
                                       " capacity %d has been reached.",
                                       name, transactionID, queueCapacity));
                 log.error(ex.getMessage());
-                sender.tell(new Status.Failure(ex), shard);
+                cohortEntry.getReplySender().tell(new Status.Failure(ex), cohortEntry.getShard().self());
             }
         } else {
             // No Tx commit currently in progress - make this the current entry and proceed with
             }
         } else {
             // No Tx commit currently in progress - make this the current entry and proceed with
@@ -207,29 +233,119 @@ public class ShardCommitCoordinator {
         }
     }
 
         }
     }
 
+    /**
+     * This method handles the canCommit phase for a transaction.
+     *
+     * @param canCommit the CanCommitTransaction message
+     * @param sender the actor that sent the message
+     * @param shard the transaction's shard actor
+     */
+    public void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+        // Lookup the cohort entry that was cached previously (or should have been) by
+        // transactionReady (via the ForwardedReadyTransaction message).
+        final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
+        if(cohortEntry == null) {
+            // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
+            // between canCommit and ready and the entry was expired from the cache.
+            IllegalStateException ex = new IllegalStateException(
+                    String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+            log.error(ex.getMessage());
+            sender.tell(new Status.Failure(ex), shard.self());
+            return;
+        }
+
+        cohortEntry.setReplySender(sender);
+        cohortEntry.setShard(shard);
+
+        handleCanCommit(cohortEntry);
+    }
+
     private void doCanCommit(final CohortEntry cohortEntry) {
 
     private void doCanCommit(final CohortEntry cohortEntry) {
 
+        boolean canCommit = false;
         try {
             // We block on the future here so we don't have to worry about possibly accessing our
             // state on a different thread outside of our dispatcher. Also, the data store
             // currently uses a same thread executor anyway.
         try {
             // We block on the future here so we don't have to worry about possibly accessing our
             // state on a different thread outside of our dispatcher. Also, the data store
             // currently uses a same thread executor anyway.
-            Boolean canCommit = cohortEntry.getCohort().canCommit().get();
+            canCommit = cohortEntry.getCohort().canCommit().get();
+
+            if(cohortEntry.isDoImmediateCommit()) {
+                if(canCommit) {
+                    doCommit(cohortEntry);
+                } else {
+                    cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
+                                "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
+                }
+            } else {
+                cohortEntry.getReplySender().tell(
+                        canCommit ? CanCommitTransactionReply.YES.toSerializable() :
+                            CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
+            }
+        } catch (Exception e) {
+            log.debug("{}: An exception occurred during canCommit: {}", name, e);
 
 
-            cohortEntry.getCanCommitSender().tell(
-                    canCommit ? CanCommitTransactionReply.YES.toSerializable() :
-                        CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
+            Throwable failure = e;
+            if(e instanceof ExecutionException) {
+                failure = e.getCause();
+            }
 
 
+            cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
+        } finally {
             if(!canCommit) {
             if(!canCommit) {
-                // Remove the entry from the cache now since the Tx will be aborted.
-                removeCohortEntry(cohortEntry.getTransactionID());
+                // Remove the entry from the cache now.
+                currentTransactionComplete(cohortEntry.getTransactionID(), true);
             }
             }
-        } catch (InterruptedException | ExecutionException e) {
-            log.debug("{}: An exception occurred during canCommit: {}", name, e);
+        }
+    }
+
+    private boolean doCommit(CohortEntry cohortEntry) {
+        log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
 
 
-            // Remove the entry from the cache now since the Tx will be aborted.
-            removeCohortEntry(cohortEntry.getTransactionID());
-            cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
+        boolean success = false;
+
+        // We perform the preCommit phase here atomically with the commit phase. This is an
+        // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
+        // coordination of preCommit across shards in case of failure but preCommit should not
+        // normally fail since we ensure only one concurrent 3-phase commit.
+
+        try {
+            // We block on the future here so we don't have to worry about possibly accessing our
+            // state on a different thread outside of our dispatcher. Also, the data store
+            // currently uses a same thread executor anyway.
+            cohortEntry.getCohort().preCommit().get();
+
+            cohortEntry.getShard().continueCommit(cohortEntry);
+
+            cohortEntry.updateLastAccessTime();
+
+            success = true;
+        } catch (Exception e) {
+            log.error("{} An exception occurred while preCommitting transaction {}",
+                    name, cohortEntry.getTransactionID(), e);
+            cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
+
+            currentTransactionComplete(cohortEntry.getTransactionID(), true);
+        }
+
+        return success;
+    }
+
+    boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+        // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
+        // this transaction.
+        final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
+        if(cohortEntry == null) {
+            // We're not the current Tx - the Tx was likely expired b/c it took too long in
+            // between the canCommit and commit messages.
+            IllegalStateException ex = new IllegalStateException(
+                    String.format("%s: Cannot commit transaction %s - it is not the current transaction",
+                            name, transactionID));
+            log.error(ex.getMessage());
+            sender.tell(new akka.actor.Status.Failure(ex), shard.self());
+            return false;
         }
         }
+
+        return doCommit(cohortEntry);
     }
 
     /**
     }
 
     /**
@@ -302,9 +418,10 @@ public class ShardCommitCoordinator {
         private DOMStoreThreePhaseCommitCohort cohort;
         private final MutableCompositeModification compositeModification;
         private final DOMStoreWriteTransaction transaction;
         private DOMStoreThreePhaseCommitCohort cohort;
         private final MutableCompositeModification compositeModification;
         private final DOMStoreWriteTransaction transaction;
-        private ActorRef canCommitSender;
-        private ActorRef shard;
+        private ActorRef replySender;
+        private Shard shard;
         private long lastAccessTime;
         private long lastAccessTime;
+        private boolean doImmediateCommit;
 
         CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
             this.compositeModification = new MutableCompositeModification();
 
         CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
             this.compositeModification = new MutableCompositeModification();
@@ -347,9 +464,11 @@ public class ShardCommitCoordinator {
             }
         }
 
             }
         }
 
-        void ready(CohortDecorator cohortDecorator) {
+        void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
             Preconditions.checkState(cohort == null, "cohort was already set");
 
             Preconditions.checkState(cohort == null, "cohort was already set");
 
+            setDoImmediateCommit(doImmediateCommit);
+
             cohort = transaction.ready();
 
             if(cohortDecorator != null) {
             cohort = transaction.ready();
 
             if(cohortDecorator != null) {
@@ -358,19 +477,27 @@ public class ShardCommitCoordinator {
             }
         }
 
             }
         }
 
-        ActorRef getCanCommitSender() {
-            return canCommitSender;
+        boolean isDoImmediateCommit() {
+            return doImmediateCommit;
+        }
+
+        void setDoImmediateCommit(boolean doImmediateCommit) {
+            this.doImmediateCommit = doImmediateCommit;
+        }
+
+        ActorRef getReplySender() {
+            return replySender;
         }
 
         }
 
-        void setCanCommitSender(ActorRef canCommitSender) {
-            this.canCommitSender = canCommitSender;
+        void setReplySender(ActorRef replySender) {
+            this.replySender = replySender;
         }
 
         }
 
-        ActorRef getShard() {
+        Shard getShard() {
             return shard;
         }
 
             return shard;
         }
 
-        void setShard(ActorRef shard) {
+        void setShard(Shard shard) {
             this.shard = shard;
         }
 
             this.shard = shard;
         }
 
index 1d5b1d8e1b99bf35f5fabe8ab3723892dd7af193..424ab2052c13c6dd545d7aac1f53b4a5a054daae 100644 (file)
@@ -61,9 +61,9 @@ public class ShardWriteTransaction extends ShardTransaction {
         if (message instanceof BatchedModifications) {
             batchedModifications((BatchedModifications)message);
         } else if (message instanceof ReadyTransaction) {
         if (message instanceof BatchedModifications) {
             batchedModifications((BatchedModifications)message);
         } else if (message instanceof ReadyTransaction) {
-            readyTransaction(transaction, !SERIALIZED_REPLY);
+            readyTransaction(transaction, !SERIALIZED_REPLY, false);
         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(transaction, SERIALIZED_REPLY);
+            readyTransaction(transaction, SERIALIZED_REPLY, false);
         } else if(WriteData.isSerializedType(message)) {
             writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(WriteData.isSerializedType(message)) {
             writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
 
@@ -100,7 +100,7 @@ public class ShardWriteTransaction extends ShardTransaction {
                             totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
                 }
 
                             totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
                 }
 
-                readyTransaction(transaction, false);
+                readyTransaction(transaction, false, batched.isDoCommitOnReady());
             } else {
                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
             }
             } else {
                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
             }
@@ -162,7 +162,8 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
         }
     }
 
-    private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized) {
+    private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized,
+            boolean doImmediateCommit) {
         String transactionID = getTransactionID();
 
         LOG.debug("readyTransaction : {}", transactionID);
         String transactionID = getTransactionID();
 
         LOG.debug("readyTransaction : {}", transactionID);
@@ -170,7 +171,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
         getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
         getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
-                cohort, compositeModification, returnSerialized), getContext());
+                cohort, compositeModification, returnSerialized, doImmediateCommit), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
         getSelf().tell(PoisonPill.getInstance(), getSelf());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
         getSelf().tell(PoisonPill.getInstance(), getSelf());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SingleCommitCohortProxy.java
new file mode 100644 (file)
index 0000000..e340859
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Arrays;
+import java.util.List;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit
+ * to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the
+ * shard as an optimization. Otherwise the 3-phase commit to the shard is delegated to a
+ * ThreePhaseCommitCohortProxy instance (this is for backwards compatibility with pre-Lithium versions).
+ *
+ * @author Thomas Pantelis
+ */
+class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort<Object> {
+    private static final Logger LOG = LoggerFactory.getLogger(SingleCommitCohortProxy.class);
+
+    private final ActorContext actorContext;
+    private final Future<Object> cohortFuture;
+    private final String transactionId;
+    private volatile DOMStoreThreePhaseCommitCohort delegateCohort = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
+    private final OperationCallback.Reference operationCallbackRef;
+
+    SingleCommitCohortProxy(ActorContext actorContext, Future<Object> cohortFuture, String transactionId,
+            OperationCallback.Reference operationCallbackRef) {
+        this.actorContext = actorContext;
+        this.cohortFuture = cohortFuture;
+        this.transactionId = transactionId;
+        this.operationCallbackRef = operationCallbackRef;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        LOG.debug("Tx {} canCommit", transactionId);
+
+        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+        cohortFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object cohortResponse) {
+                if(failure != null) {
+                    operationCallbackRef.get().failure();
+                    returnFuture.setException(failure);
+                    return;
+                }
+
+                operationCallbackRef.get().success();
+
+                if(cohortResponse instanceof ActorSelection) {
+                    handlePreLithiumActorCohort((ActorSelection)cohortResponse, returnFuture);
+                    return;
+                }
+
+                LOG.debug("Tx {} successfully completed direct commit", transactionId);
+
+                // The Future was the result of a direct commit to the shard, essentially eliding the
+                // front-end 3PC coordination. We don't really care about the specific Future
+                // response object, only that it completed successfully. At this point the Tx is complete
+                // so return true. The subsequent preCommit and commit phases will be no-ops, ie return
+                // immediate success, to complete the 3PC for the front-end.
+                returnFuture.set(Boolean.TRUE);
+            }
+        }, actorContext.getClientDispatcher());
+
+        return returnFuture;
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        return delegateCohort.preCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        return delegateCohort.abort();
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        return delegateCohort.commit();
+    }
+
+    @Override
+    List<Future<Object>> getCohortFutures() {
+        return Arrays.asList(cohortFuture);
+    }
+
+    private void handlePreLithiumActorCohort(ActorSelection actorSelection, final SettableFuture<Boolean> returnFuture) {
+        // Handle backwards compatibility. An ActorSelection response would be returned from a
+        // pre-Lithium version. In this case delegate to a ThreePhaseCommitCohortProxy.
+        delegateCohort = new ThreePhaseCommitCohortProxy(actorContext,
+                Arrays.asList(Futures.successful(actorSelection)), transactionId);
+        com.google.common.util.concurrent.Futures.addCallback(delegateCohort.canCommit(), new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(Boolean canCommit) {
+                returnFuture.set(canCommit);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                returnFuture.setException(t);
+            }
+        });
+    }
+}
index 3a2bcf2336713d2af695065792d62358de4ce9c2..57749a1a736bc935da74222ca984d834083d6206 100644 (file)
@@ -32,30 +32,14 @@ import scala.runtime.AbstractFunction1;
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
  */
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
  */
-public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort {
+public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
-    private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
-            com.google.common.util.concurrent.Futures.immediateFuture(null);
-
     private final ActorContext actorContext;
     private final List<Future<ActorSelection>> cohortFutures;
     private volatile List<ActorSelection> cohorts;
     private final String transactionId;
     private final ActorContext actorContext;
     private final List<Future<ActorSelection>> cohortFutures;
     private volatile List<ActorSelection> cohorts;
     private final String transactionId;
-    private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
-        @Override
-        public void run() {
-        }
-
-        @Override
-        public void success() {
-        }
-
-        @Override
-        public void failure() {
-        }
-    };
 
     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
             List<Future<ActorSelection>> cohortFutures, String transactionId) {
 
     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
             List<Future<ActorSelection>> cohortFutures, String transactionId) {
@@ -190,7 +174,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort
     public ListenableFuture<Void> preCommit() {
         // We don't need to do anything here - preCommit is done atomically with the commit phase
         // by the shard.
     public ListenableFuture<Void> preCommit() {
         // We don't need to do anything here - preCommit is done atomically with the commit phase
         // by the shard.
-        return IMMEDIATE_SUCCESS;
+        return IMMEDIATE_VOID_SUCCESS;
     }
 
     @Override
     }
 
     @Override
@@ -207,7 +191,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort
 
     @Override
     public ListenableFuture<Void> commit() {
 
     @Override
     public ListenableFuture<Void> commit() {
-        OperationCallback operationCallback = cohortFutures.isEmpty() ? NO_OP_CALLBACK :
+        OperationCallback operationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
                 new TransactionRateLimitingCallback(actorContext);
 
         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
                 new TransactionRateLimitingCallback(actorContext);
 
         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
@@ -216,7 +200,8 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort
 
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
                                                  final Class<?> expectedResponseClass, final boolean propagateException) {
 
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
                                                  final Class<?> expectedResponseClass, final boolean propagateException) {
-        return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
+        return voidOperation(operationName, message, expectedResponseClass, propagateException,
+                OperationCallback.NO_OP_CALLBACK);
     }
 
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
     }
 
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
index 11066edd543413de08591102ba2541d7baec9a0f..5531b5f5401676a13d02a08b69683a2f31ca077a 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorSelection;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
@@ -30,7 +29,7 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
     private interface State {
         boolean isReady();
 
     private interface State {
         boolean isReady();
 
-        List<Future<ActorSelection>> getPreviousReadyFutures();
+        List<Future<Object>> getPreviousReadyFutures();
     }
 
     private static class Allocated implements State {
     }
 
     private static class Allocated implements State {
@@ -46,14 +45,14 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
         }
 
         @Override
         }
 
         @Override
-        public List<Future<ActorSelection>> getPreviousReadyFutures() {
+        public List<Future<Object>> getPreviousReadyFutures() {
             return transaction.getReadyFutures();
         }
     }
 
     private static abstract class AbstractDefaultState implements State {
         @Override
             return transaction.getReadyFutures();
         }
     }
 
     private static abstract class AbstractDefaultState implements State {
         @Override
-        public List<Future<ActorSelection>> getPreviousReadyFutures() {
+        public List<Future<Object>> getPreviousReadyFutures() {
             return Collections.emptyList();
         }
     }
             return Collections.emptyList();
         }
     }
index bc6e5f229fe04aacffd8e719c138ba394b27bbd8..4eea785964b1e8ef9b4023390547a813408863e4 100644 (file)
@@ -32,4 +32,8 @@ interface TransactionContext {
     void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture);
 
     void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
     void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture);
 
     void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
+
+    boolean supportsDirectCommit();
+
+    Future<Object> directCommit();
 }
 }
index c722918c5cfed8ad7062e63911ae60fa45aad7fa..a9deeaaeba0806a1ad355dfeb40216d4a624219f 100644 (file)
@@ -89,13 +89,27 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
     }
 
         actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
     }
 
+    @Override
+    public boolean supportsDirectCommit() {
+        return true;
+    }
+
+    @Override
+    public Future<Object> directCommit() {
+        LOG.debug("Tx {} directCommit called", getIdentifier());
+
+        // Send the remaining batched modifications, if any, with the ready flag set.
+
+        return sendBatchedModifications(true, true);
+    }
+
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the remaining batched modifications, if any, with the ready flag set.
 
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the remaining batched modifications, if any, with the ready flag set.
 
-        Future<Object> lastModificationsFuture = sendBatchedModifications(true);
+        Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
 
         return transformReadyReply(lastModificationsFuture);
     }
 
         return transformReadyReply(lastModificationsFuture);
     }
@@ -145,10 +159,10 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     }
 
     protected Future<Object> sendBatchedModifications() {
     }
 
     protected Future<Object> sendBatchedModifications() {
-        return sendBatchedModifications(false);
+        return sendBatchedModifications(false, false);
     }
 
     }
 
-    protected Future<Object> sendBatchedModifications(boolean ready) {
+    protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
         Future<Object> sent = null;
         if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
             if(batchedModifications == null) {
         Future<Object> sent = null;
         if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
             if(batchedModifications == null) {
@@ -161,6 +175,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             }
 
             batchedModifications.setReady(ready);
             }
 
             batchedModifications.setReady(ready);
+            batchedModifications.setDoCommitOnReady(doCommitOnReady);
             batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
             sent = executeOperationAsync(batchedModifications);
 
             batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
             sent = executeOperationAsync(batchedModifications);
 
index 71799c92d40a9bca75304a196f1c500895a04b09..f12fdd99eab792080069f38bebe1a721a71f1b32 100644 (file)
@@ -354,7 +354,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     @Override
     }
 
     @Override
-    public AbstractThreePhaseCommitCohort ready() {
+    public AbstractThreePhaseCommitCohort<?> ready() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Read-only transactions cannot be readied");
 
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Read-only transactions cannot be readied");
 
@@ -371,10 +371,55 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         throttleOperation(txFutureCallbackMap.size());
 
 
         throttleOperation(txFutureCallbackMap.size());
 
+        final boolean isSingleShard = txFutureCallbackMap.size() == 1;
+        return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
+        TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
+
+        LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
+                txFutureCallback.getShardName(), transactionChainId);
+
+        final OperationCallback.Reference operationCallbackRef =
+                new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
+        final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+        final Future future;
+        if (transactionContext != null) {
+            // avoid the creation of a promise and a TransactionOperation
+            future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
+        } else {
+            final Promise promise = akka.dispatch.Futures.promise();
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
+                }
+            });
+            future = promise.future();
+        }
+
+        return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
+    }
+
+    private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
+            OperationCallback.Reference operationCallbackRef) {
+        if(transactionContext.supportsDirectCommit()) {
+            TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
+            operationCallbackRef.set(rateLimitingCallback);
+            rateLimitingCallback.run();
+            return transactionContext.directCommit();
+        } else {
+            return transactionContext.readyTransaction();
+        }
+    }
+
+    private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
-            LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
+            LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
                         txFutureCallback.getShardName(), transactionChainId);
 
             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
                         txFutureCallback.getShardName(), transactionChainId);
 
             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
@@ -396,8 +441,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             cohortFutures.add(future);
         }
 
             cohortFutures.add(future);
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
-            getIdentifier().toString());
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
     }
 
     @Override
     }
 
     @Override
index d17497c18c1acccb4f986be231e0adf3c7b39fab..8b6cce6c5bd392101a6e91040a08596ac743dfe5 100644 (file)
@@ -85,4 +85,14 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
 
         return readyTxReply.getCohortPath();
     }
 
         return readyTxReply.getCohortPath();
     }
+
+    @Override
+    public boolean supportsDirectCommit() {
+        return false;
+    }
+
+    @Override
+    public Future<Object> directCommit() {
+        throw new UnsupportedOperationException("directCommit is not supported for " + getClass());
+    }
 }
 }
index 86f96f57d0f3cb0c284a1a902bd820baaac9e82d..f95473f8a6b3edbdd5ed6cfb5dbe7ad789f55593 100644 (file)
@@ -22,6 +22,7 @@ public class BatchedModifications extends MutableCompositeModification implement
     private static final long serialVersionUID = 1L;
 
     private boolean ready;
     private static final long serialVersionUID = 1L;
 
     private boolean ready;
+    private boolean doCommitOnReady;
     private int totalMessagesSent;
     private String transactionID;
     private String transactionChainID;
     private int totalMessagesSent;
     private String transactionID;
     private String transactionChainID;
@@ -43,6 +44,14 @@ public class BatchedModifications extends MutableCompositeModification implement
         this.ready = ready;
     }
 
         this.ready = ready;
     }
 
+    public boolean isDoCommitOnReady() {
+        return doCommitOnReady;
+    }
+
+    public void setDoCommitOnReady(boolean doCommitOnReady) {
+        this.doCommitOnReady = doCommitOnReady;
+    }
+
     public int getTotalMessagesSent() {
         return totalMessagesSent;
     }
     public int getTotalMessagesSent() {
         return totalMessagesSent;
     }
@@ -66,6 +75,7 @@ public class BatchedModifications extends MutableCompositeModification implement
         transactionChainID = in.readUTF();
         ready = in.readBoolean();
         totalMessagesSent = in.readInt();
         transactionChainID = in.readUTF();
         ready = in.readBoolean();
         totalMessagesSent = in.readInt();
+        doCommitOnReady = in.readBoolean();
     }
 
     @Override
     }
 
     @Override
@@ -75,6 +85,7 @@ public class BatchedModifications extends MutableCompositeModification implement
         out.writeUTF(transactionChainID);
         out.writeBoolean(ready);
         out.writeInt(totalMessagesSent);
         out.writeUTF(transactionChainID);
         out.writeBoolean(ready);
         out.writeInt(totalMessagesSent);
+        out.writeBoolean(doCommitOnReady);
     }
 
     @Override
     }
 
     @Override
index 0f872430599d1d75869b2404e5f93052d2986cec..cdd7859a3019d1dded7d53ae84e8b39d6acfa717 100644 (file)
@@ -20,16 +20,18 @@ public class ForwardedReadyTransaction {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
+    private final boolean doImmediateCommit;
     private final short txnClientVersion;
 
     public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
             DOMStoreThreePhaseCommitCohort cohort, Modification modification,
     private final short txnClientVersion;
 
     public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
             DOMStoreThreePhaseCommitCohort cohort, Modification modification,
-            boolean returnSerialized) {
+            boolean returnSerialized, boolean doImmediateCommit) {
         this.transactionID = transactionID;
         this.cohort = cohort;
         this.modification = modification;
         this.returnSerialized = returnSerialized;
         this.txnClientVersion = txnClientVersion;
         this.transactionID = transactionID;
         this.cohort = cohort;
         this.modification = modification;
         this.returnSerialized = returnSerialized;
         this.txnClientVersion = txnClientVersion;
+        this.doImmediateCommit = doImmediateCommit;
     }
 
     public String getTransactionID() {
     }
 
     public String getTransactionID() {
@@ -51,4 +53,8 @@ public class ForwardedReadyTransaction {
     public short getTxnClientVersion() {
         return txnClientVersion;
     }
     public short getTxnClientVersion() {
         return txnClientVersion;
     }
+
+    public boolean isDoImmediateCommit() {
+        return doImmediateCommit;
+    }
 }
 }
index 34f0164504fe666e7cfb76090c57ed5b2362a268..f6a103ffb88e9a6b8b89aeb4794c4468bd7a84e1 100644 (file)
@@ -222,7 +222,11 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         doAnswer(new Answer<ListenableFuture<Void>>() {
             @Override
             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
         doAnswer(new Answer<ListenableFuture<Void>>() {
             @Override
             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
-                return actual.preCommit();
+                if(preCommit != null) {
+                    return preCommit.apply(actual);
+                } else {
+                    return actual.preCommit();
+                }
             }
         }).when(cohort).preCommit();
 
             }
         }).when(cohort).preCommit();
 
index 6a1e12a96b6e9cd2eb7a3a89204a87d21a64f1da..be965608144cd68dca83b0a2cf8835b82bc30886 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
@@ -24,13 +23,19 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import akka.testkit.JavaTestKit;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import akka.testkit.JavaTestKit;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
 import org.junit.Before;
 import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -45,6 +50,7 @@ import org.opendaylight.controller.cluster.datastore.TransactionProxy.Transactio
 import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
@@ -55,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.modification.AbstractModifi
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
@@ -81,7 +88,24 @@ public abstract class AbstractTransactionProxyTest {
 
     private static ActorSystem system;
 
 
     private static ActorSystem system;
 
-    private final Configuration configuration = new MockConfiguration();
+    private final Configuration configuration = new MockConfiguration() {
+        @Override
+        public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
+            return ImmutableMap.<String, ShardStrategy>builder().put(
+                    "junk", new ShardStrategy() {
+                        @Override
+                        public String findShard(YangInstanceIdentifier path) {
+                            return "junk";
+                        }
+                    }).build();
+        }
+
+        @Override
+        public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
+            return TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace) ?
+                    Optional.of("junk") : Optional.<String>absent();
+        }
+    };
 
     @Mock
     protected ActorContext mockActorContext;
 
     @Mock
     protected ActorContext mockActorContext;
@@ -126,6 +150,9 @@ public abstract class AbstractTransactionProxyTest {
         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
+        Timer timer = new MetricRegistry().timer("test");
+        doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class));
+
         ShardStrategyFactory.setConfiguration(configuration);
     }
 
         ShardStrategyFactory.setConfiguration(configuration);
     }
 
@@ -246,8 +273,13 @@ public abstract class AbstractTransactionProxyTest {
     }
 
     protected void expectBatchedModificationsReady(ActorRef actorRef) {
     }
 
     protected void expectBatchedModificationsReady(ActorRef actorRef) {
-        doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+        expectBatchedModificationsReady(actorRef, false);
+    }
+
+    protected void expectBatchedModificationsReady(ActorRef actorRef, boolean doCommitOnReady) {
+        doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
+            readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                    eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
     protected void expectBatchedModifications(int count) {
     }
 
     protected void expectBatchedModifications(int count) {
@@ -274,6 +306,10 @@ public abstract class AbstractTransactionProxyTest {
     }
 
     protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
     }
 
     protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
+        return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
+    }
+
+    protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         log.info("Created mock shard actor {}", actorRef);
 
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         log.info("Created mock shard actor {}", actorRef);
 
@@ -281,7 +317,7 @@ public abstract class AbstractTransactionProxyTest {
                 when(mockActorContext).actorSelection(actorRef.path().toString());
 
         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
 
         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
-                when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+                when(mockActorContext).findPrimaryShardAsync(eq(shardName));
 
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
 
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
@@ -291,8 +327,8 @@ public abstract class AbstractTransactionProxyTest {
     }
 
     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
     }
 
     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
-            TransactionType type, int transactionVersion) {
-        ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
+            TransactionType type, int transactionVersion, String shardName) {
+        ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName);
 
         return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
                 memberName, shardActorRef);
 
         return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
                 memberName, shardActorRef);
@@ -321,9 +357,15 @@ public abstract class AbstractTransactionProxyTest {
     }
 
     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
     }
 
     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
-        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
+                DefaultShardStrategy.DEFAULT_SHARD);
     }
 
     }
 
+    protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type,
+            String shardName) {
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
+                shardName);
+    }
 
     protected void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
             throws Throwable {
 
     protected void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
             throws Throwable {
@@ -362,14 +404,20 @@ public abstract class AbstractTransactionProxyTest {
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
-        verifyBatchedModifications(batchedModifications.get(0), expIsReady, expected);
+        verifyBatchedModifications(batchedModifications.get(0), expIsReady, expIsReady, expected);
     }
 
     protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) {
     }
 
     protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) {
+        verifyBatchedModifications(message, expIsReady, false, expected);
+    }
+
+    protected void verifyBatchedModifications(Object message, boolean expIsReady, boolean expIsDoCommitOnReady,
+            Modification... expected) {
         assertEquals("Message type", BatchedModifications.class, message.getClass());
         BatchedModifications batchedModifications = (BatchedModifications)message;
         assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
         assertEquals("isReady", expIsReady, batchedModifications.isReady());
         assertEquals("Message type", BatchedModifications.class, message.getClass());
         BatchedModifications batchedModifications = (BatchedModifications)message;
         assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
         assertEquals("isReady", expIsReady, batchedModifications.isReady());
+        assertEquals("isDoCommitOnReady", expIsDoCommitOnReady, batchedModifications.isDoCommitOnReady());
         for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
             Modification actual = batchedModifications.getModifications().get(i);
             assertEquals("Modification type", expected[i].getClass(), actual.getClass());
         for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
             Modification actual = batchedModifications.getModifications().get(i);
             assertEquals("Modification type", expected[i].getClass(), actual.getClass());
@@ -382,27 +430,44 @@ public abstract class AbstractTransactionProxyTest {
         }
     }
 
         }
     }
 
-    protected void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
+    protected void verifyCohortFutures(AbstractThreePhaseCommitCohort<?> proxy,
             Object... expReplies) throws Exception {
             assertEquals("getReadyOperationFutures size", expReplies.length,
                     proxy.getCohortFutures().size());
 
             Object... expReplies) throws Exception {
             assertEquals("getReadyOperationFutures size", expReplies.length,
                     proxy.getCohortFutures().size());
 
-            int i = 0;
-            for( Future<ActorSelection> future: proxy.getCohortFutures()) {
+            List<Object> futureResults = new ArrayList<>();
+            for( Future<?> future: proxy.getCohortFutures()) {
                 assertNotNull("Ready operation Future is null", future);
                 assertNotNull("Ready operation Future is null", future);
+                try {
+                    futureResults.add(Await.result(future, Duration.create(5, TimeUnit.SECONDS)));
+                } catch(Exception e) {
+                    futureResults.add(e);
+                }
+            }
 
 
-                Object expReply = expReplies[i++];
-                if(expReply instanceof ActorSelection) {
-                    ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
-                    assertEquals("Cohort actor path", expReply, actual);
-                } else {
-                    try {
-                        Await.result(future, Duration.create(5, TimeUnit.SECONDS));
-                        fail("Expected exception from ready operation Future");
-                    } catch(Exception e) {
-                        assertTrue(String.format("Expected exception type %s. Actual %s",
-                                expReply, e.getClass()), ((Class<?>)expReply).isInstance(e));
+            for(int i = 0; i < expReplies.length; i++) {
+                Object expReply = expReplies[i];
+                boolean found = false;
+                Iterator<?> iter = futureResults.iterator();
+                while(iter.hasNext()) {
+                    Object actual = iter.next();
+                    if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(expReply) &&
+                       CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(actual)) {
+                        found = true;
+                    } else if(expReply instanceof ActorSelection && Objects.equal(expReply, actual)) {
+                        found = true;
+                    } else if(expReply instanceof Class && ((Class<?>)expReply).isInstance(actual)) {
+                        found = true;
                     }
                     }
+
+                    if(found) {
+                        iter.remove();
+                        break;
+                    }
+                }
+
+                if(!found) {
+                    fail(String.format("No cohort Future response found for %s. Actual: %s", expReply, futureResults));
                 }
             }
         }
                 }
             }
         }
index a8384d8758a7bc3523bf75f1f3e31480d21ae737..94cc6e5e59cf0efc55fe3af56c7d7a90cf7bc4c3 100644 (file)
@@ -147,9 +147,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
         }};
     }
 
-    private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception {
+    private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
+            final boolean writeOnly) throws Exception {
         new IntegrationTestKit(getSystem()) {{
         new IntegrationTestKit(getSystem()) {{
-            String testName = "testTransactionWritesWithShardNotInitiallyReady";
             String shardName = "test-1";
 
             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
             String shardName = "test-1";
 
             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
@@ -241,12 +241,12 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
     @Test
     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
     @Test
     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-        testTransactionWritesWithShardNotInitiallyReady(true);
+        testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
     }
 
     @Test
     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
     }
 
     @Test
     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
-        testTransactionWritesWithShardNotInitiallyReady(false);
+        testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
     }
 
     @Test
     }
 
     @Test
@@ -873,7 +873,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }
 
         void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
         }
 
         void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
-            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+            Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
             assertEquals("canCommit", true, canCommit);
             cohort.preCommit().get(5, TimeUnit.SECONDS);
             cohort.commit().get(5, TimeUnit.SECONDS);
             assertEquals("canCommit", true, canCommit);
             cohort.preCommit().get(5, TimeUnit.SECONDS);
             cohort.commit().get(5, TimeUnit.SECONDS);
index e3b82df1743e75c433cec193d54a2cbfbd696319..72f672794ab16c9bf89920bcf855cb72fd4bcc63 100644 (file)
@@ -8,6 +8,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -469,7 +470,7 @@ public class ShardTest extends AbstractShardTest {
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
+                    cohort1, modification1, true, false), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.class));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.class));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -484,11 +485,11 @@ public class ShardTest extends AbstractShardTest {
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
+                    cohort2, modification2, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true), getRef());
+                    cohort3, modification3, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
@@ -595,18 +596,7 @@ public class ShardTest extends AbstractShardTest {
 
             // Verify data in the data store.
 
 
             // Verify data in the data store.
 
-            NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
-            assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
-                    outerList.getValue() instanceof Iterable);
-            Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
-                       entry instanceof MapEntryNode);
-            MapEntryNode mapEntry = (MapEntryNode)entry;
-            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
-                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
-            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
-            assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+            verifyOuterListEntry(shard, 1);
 
             verifyLastApplied(shard, 2);
 
 
             verifyLastApplied(shard, 2);
 
@@ -615,25 +605,25 @@ public class ShardTest extends AbstractShardTest {
     }
 
     private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
     }
 
     private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data, boolean ready) {
-        return newBatchedModifications(transactionID, null, path, data, ready);
+            NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
+        return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
     }
 
     private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
     }
 
     private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
-            YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
+            YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
         batched.addModification(new WriteModification(path, data));
         batched.setReady(ready);
         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
         batched.addModification(new WriteModification(path, data));
         batched.setReady(ready);
+        batched.setDoCommitOnReady(doCommitOnReady);
         return batched;
     }
 
         return batched;
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     @Test
-    public void testMultipleBatchedModifications() throws Throwable {
+    public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testMultipleBatchedModifications");
+                    "testBatchedModificationsWithNoCommitOnReady");
 
             waitUntilLeader(shard);
 
 
             waitUntilLeader(shard);
 
@@ -657,18 +647,18 @@ public class ShardTest extends AbstractShardTest {
             // Send a BatchedModifications to start a transaction.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
             // Send a BatchedModifications to start a transaction.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send a couple more BatchedModifications.
 
             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
             expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -690,23 +680,85 @@ public class ShardTest extends AbstractShardTest {
 
             // Verify data in the data store.
 
 
             // Verify data in the data store.
 
-            NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
-            assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
-                    outerList.getValue() instanceof Iterable);
-            Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
-                       entry instanceof MapEntryNode);
-            MapEntryNode mapEntry = (MapEntryNode)entry;
-            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
-                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
-            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
-            assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+            verifyOuterListEntry(shard, 1);
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
+    @Test
+    public void testBatchedModificationsWithCommitOnReady() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testBatchedModificationsWithCommitOnReady");
+
+            waitUntilLeader(shard);
+
+            final String transactionID = "tx";
+            FiniteDuration duration = duration("5 seconds");
+
+            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+                @Override
+                public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+                    if(mockCohort.get() == null) {
+                        mockCohort.set(createDelegatingMockCohort("cohort", actual));
+                    }
+
+                    return mockCohort.get();
+                }
+            };
+
+            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+            // Send a BatchedModifications to start a transaction.
+
+            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
+
+            // Send a couple more BatchedModifications.
+
+            shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
+
+            shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
+                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            InOrder inOrder = inOrder(mockCohort.get());
+            inOrder.verify(mockCohort.get()).canCommit();
+            inOrder.verify(mockCohort.get()).preCommit();
+            inOrder.verify(mockCohort.get()).commit();
+
+            // Verify data in the data store.
+
+            verifyOuterListEntry(shard, 1);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @SuppressWarnings("unchecked")
+    private void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
+        NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
+        assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+                outerList.getValue() instanceof Iterable);
+        Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
+        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+                entry instanceof MapEntryNode);
+        MapEntryNode mapEntry = (MapEntryNode)entry;
+        Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+                mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+        assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+        assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
+    }
+
     @Test
     public void testBatchedModificationsOnTransactionChain() throws Throwable {
         new ShardTestKit(getSystem()) {{
     @Test
     public void testBatchedModificationsOnTransactionChain() throws Throwable {
         new ShardTestKit(getSystem()) {{
@@ -727,7 +779,7 @@ public class ShardTest extends AbstractShardTest {
             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
-                    containerNode, true), getRef());
+                    containerNode, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Create a read Tx on the same chain.
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Create a read Tx on the same chain.
@@ -800,6 +852,45 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
         }};
     }
 
+    @Test
+    public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testForwardedReadyTransactionWithImmediateCommit");
+
+            waitUntilLeader(shard);
+
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID = "tx1";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+                    TestModel.TEST_PATH, containerNode, modification);
+
+            FiniteDuration duration = duration("5 seconds");
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true, true), getRef());
+
+            expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+            InOrder inOrder = inOrder(cohort);
+            inOrder.verify(cohort).canCommit();
+            inOrder.verify(cohort).preCommit();
+            inOrder.verify(cohort).commit();
+
+            NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+            assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @Test
     public void testCommitWithPersistenceDisabled() throws Throwable {
         dataStoreContextBuilder.persistent(false);
     @Test
     public void testCommitWithPersistenceDisabled() throws Throwable {
         dataStoreContextBuilder.persistent(false);
@@ -826,7 +917,7 @@ public class ShardTest extends AbstractShardTest {
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true), getRef());
+                    cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
@@ -878,7 +969,7 @@ public class ShardTest extends AbstractShardTest {
                 // by the ShardTransaction.
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                 // by the ShardTransaction.
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                        cohort, modification, true), getRef());
+                        cohort, modification, true, false), getRef());
                 expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
                 expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
@@ -933,7 +1024,7 @@ public class ShardTest extends AbstractShardTest {
                 // by the ShardTransaction.
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                 // by the ShardTransaction.
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                        cohort, modification, true), getRef());
+                        cohort, modification, true, false), getRef());
                 expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
                 expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
@@ -973,7 +1064,7 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
 
             waitUntilLeader(shard);
 
-         // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+            // Setup 2 simulated transactions with mock cohorts. The first one fails in the
             // commit phase.
 
             String transactionID1 = "tx1";
             // commit phase.
 
             String transactionID1 = "tx1";
@@ -995,11 +1086,11 @@ public class ShardTest extends AbstractShardTest {
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
+                    cohort1, modification1, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
+                    cohort2, modification2, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1052,37 +1143,66 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
 
             waitUntilLeader(shard);
 
-            String transactionID = "tx1";
-            MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
-            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
+            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
 
             FiniteDuration duration = duration("5 seconds");
+            final Timeout timeout = new Timeout(duration);
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             expectMsgClass(duration, ReadyTransactionReply.class);
 
-            // Send the CanCommitTransaction message.
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            // Send the CanCommitTransaction message for the first Tx.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send the CommitTransaction message. This should send back an error
-            // for preCommit failure.
+            // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
+            // processed after the first Tx completes.
 
 
-            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+            Future<Object> canCommitFuture = Patterns.ask(shard,
+                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+
+            // Send the CommitTransaction message for the first Tx. This should send back an error
+            // and trigger the 2nd Tx to proceed.
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
-            InOrder inOrder = inOrder(cohort);
-            inOrder.verify(cohort).canCommit();
-            inOrder.verify(cohort).preCommit();
+            // Wait for the 2nd Tx to complete the canCommit phase.
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            canCommitFuture.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(final Throwable t, final Object resp) {
+                    latch.countDown();
+                }
+            }, getSystem().dispatcher());
+
+            assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+
+            InOrder inOrder = inOrder(cohort1, cohort2);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort2).canCommit();
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
@@ -1099,7 +1219,7 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
 
             final FiniteDuration duration = duration("5 seconds");
 
-            String transactionID = "tx1";
+            String transactionID1 = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
             MutableCompositeModification modification = new MutableCompositeModification();
             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
@@ -1107,15 +1227,165 @@ public class ShardTest extends AbstractShardTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
-            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             expectMsgClass(duration, akka.actor.Status.Failure.class);
 
+            // Send another can commit to ensure the failed one got cleaned up.
+
+            reset(cohort);
+
+            String transactionID2 = "tx2";
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("getCanCommit", true, reply.getCanCommit());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testCanCommitPhaseFalseResponse() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCanCommitPhaseFalseResponse");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // Send the CanCommitTransaction message.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("getCanCommit", false, reply.getCanCommit());
+
+            // Send another can commit to ensure the failed one got cleaned up.
+
+            reset(cohort);
+
+            String transactionID2 = "tx2";
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            reply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("getCanCommit", true, reply.getCanCommit());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testImmediateCommitWithCanCommitPhaseFailure");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort, modification, true, true), getRef());
+
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+            // Send another can commit to ensure the failed one got cleaned up.
+
+            reset(cohort);
+
+            String transactionID2 = "tx2";
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+            doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+            doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort, modification, true, true), getRef());
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testImmediateCommitWithCanCommitPhaseFalseResponse");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            String transactionID = "tx1";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true, true), getRef());
+
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+            // Send another can commit to ensure the failed one got cleaned up.
+
+            reset(cohort);
+
+            String transactionID2 = "tx2";
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+            doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+            doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort, modification, true, true), getRef());
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
@@ -1159,7 +1429,7 @@ public class ShardTest extends AbstractShardTest {
                     modification, preCommit);
 
             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                     modification, preCommit);
 
             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true), getRef());
+                    cohort, modification, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
@@ -1224,11 +1494,11 @@ public class ShardTest extends AbstractShardTest {
             // Ready the Tx's
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
             // Ready the Tx's
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
+                    cohort1, modification1, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
+                    cohort2, modification2, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
@@ -1241,6 +1511,11 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
 
             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
 
+            // Try to commit the 1st Tx - should fail as it's not the current Tx.
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
+
             // Commit the 2nd Tx.
 
             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
             // Commit the 2nd Tx.
 
             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
@@ -1288,15 +1563,15 @@ public class ShardTest extends AbstractShardTest {
             // Ready the Tx's
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
             // Ready the Tx's
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
+                    cohort1, modification1, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
+                    cohort2, modification2, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true), getRef());
+                    cohort3, modification3, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx.
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx.
@@ -1360,11 +1635,11 @@ public class ShardTest extends AbstractShardTest {
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
+                    cohort1, modification1, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
+                    cohort2, modification2, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
             expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
index c9335f378a8662e7b7ceb488961fb49588d50baf..b22001a4da6c234f6ac7b7167e4f4a4e18d25771 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
@@ -412,11 +413,11 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
     }
 
     @Test
-    public void testOnReceiveBatchedModificationsReady() throws Exception {
+    public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
         new JavaTestKit(getSystem()) {{
 
             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
         new JavaTestKit(getSystem()) {{
 
             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
-                    "testOnReceiveBatchedModificationsReady");
+                    "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
             watcher.watch(transaction);
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
             watcher.watch(transaction);
@@ -443,6 +444,33 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
         }};
     }
 
+    @Test
+    public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+            NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                    withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.addModification(new WriteModification(writePath, writeData));
+            batched.setReady(true);
+            batched.setDoCommitOnReady(true);
+            batched.setTotalMessagesSent(1);
+
+            transaction.tell(batched, getRef());
+            expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+        }};
+    }
+
     @Test(expected=TestException.class)
     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
         new JavaTestKit(getSystem()) {{
     @Test(expected=TestException.class)
     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
         new JavaTestKit(getSystem()) {{
index cc9692bfd91b72f3245f2bb6bd160f12551720b5..93c6ddbe7398be9b2054bb67a5b9b69bac47c09c 100644 (file)
@@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@@ -326,7 +327,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteAfterAsyncRead() throws Throwable {
 
     @Test
     public void testWriteAfterAsyncRead() throws Throwable {
-        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
+        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
 
         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
 
         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
@@ -460,7 +461,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        expectBatchedModificationsReady(actorRef);
+        expectBatchedModificationsReady(actorRef, true);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
@@ -470,16 +471,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        assertTrue(ready instanceof SingleCommitCohortProxy);
 
 
-        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
-        verifyBatchedModifications(batchedModifications.get(0), true,
+        verifyBatchedModifications(batchedModifications.get(0), true, true,
                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
@@ -492,7 +491,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        expectBatchedModificationsReady(actorRef);
+        expectBatchedModificationsReady(actorRef, true);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
@@ -500,16 +499,36 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        assertTrue(ready instanceof SingleCommitCohortProxy);
 
 
-        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
-        verifyBatchedModifications(batchedModifications.get(0), true);
+        verifyBatchedModifications(batchedModifications.get(0), true, true);
+    }
+
+    @Test
+    public void testReadyWithMultipleShardWrites() throws Exception {
+        ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
+
+        expectBatchedModificationsReady(actorRef1);
+        expectBatchedModificationsReady(actorRef2);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+        transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
+        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
+                actorSelection(actorRef2));
     }
 
     @Test
     }
 
     @Test
@@ -520,7 +539,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        expectBatchedModificationsReady(actorRef);
+        expectBatchedModificationsReady(actorRef, true);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -528,16 +547,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        assertTrue(ready instanceof SingleCommitCohortProxy);
 
 
-        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
-        verifyBatchedModifications(batchedModifications.get(0), true,
+        verifyBatchedModifications(batchedModifications.get(0), true, true,
                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
@@ -551,7 +568,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        expectBatchedModificationsReady(actorRef);
+        expectBatchedModificationsReady(actorRef, true);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -559,11 +576,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        assertTrue(ready instanceof SingleCommitCohortProxy);
 
 
-        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
@@ -571,7 +586,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyBatchedModifications(batchedModifications.get(0), false,
                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         verifyBatchedModifications(batchedModifications.get(0), false,
                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
-        verifyBatchedModifications(batchedModifications.get(1), true);
+        verifyBatchedModifications(batchedModifications.get(1), true, true);
 
         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
@@ -593,11 +608,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        assertTrue(ready instanceof SingleCommitCohortProxy);
 
 
-        verifyCohortFutures(proxy, TestException.class);
+        verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
     }
 
     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
     }
 
     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
@@ -615,11 +628,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        assertTrue(ready instanceof SingleCommitCohortProxy);
 
 
-        verifyCohortFutures(proxy, toThrow.getClass());
+        verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
     }
 
     @Test
     }
 
     @Test
@@ -640,27 +651,26 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     @Test
     public void testReadyWithInvalidReplyMessageType() throws Exception {
         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
     @Test
     public void testReadyWithInvalidReplyMessageType() throws Exception {
         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
 
-        //expectBatchedModifications(actorRef, 1);
+        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)),
-                        isA(BatchedModifications.class));
+                executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
+
+        expectBatchedModificationsReady(actorRef2);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+        transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
+        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
 
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
 
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
-
-        verifyCohortFutures(proxy, IllegalArgumentException.class);
+        verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
+                IllegalArgumentException.class);
     }
 
     @Test
     }
 
     @Test
@@ -746,7 +756,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
         doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
         doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
-        expectBatchedModificationsReady(actorRef);
+        expectBatchedModificationsReady(actorRef, true);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
@@ -755,11 +765,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        assertTrue(ready instanceof SingleCommitCohortProxy);
 
 
-        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+        verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
     }
 
     private static interface TransactionProxyOperation {
     }
 
     private static interface TransactionProxyOperation {
@@ -1224,8 +1232,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
 
         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
 
-        verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
-                new DeleteModification(deletePath2));
+        verifyBatchedModifications(batchedModifications.get(2), true, true,
+                new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
 
         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
     }
 
         assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
     }
index 9e1557ae3cf605255721e84bec9da1642c520c8c..caabb32d71e174ad03a785056b2cd84d06ab453b 100644 (file)
@@ -247,7 +247,7 @@ public class PreLithiumShardTest extends AbstractShardTest {
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
             // by the ShardTransaction.
 
             shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
-                    cohort1, modification1, true), getRef());
+                    cohort1, modification1, true, false), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -262,11 +262,11 @@ public class PreLithiumShardTest extends AbstractShardTest {
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
-                    cohort2, modification2, true), getRef());
+                    cohort2, modification2, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
-                    cohort3, modification3, true), getRef());
+                    cohort3, modification3, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
index 4cf8b67ddbdebeef130b3932a8f77d640b5337ae..ca342b960a8749d78ae5aa0b1879b06ecc61110b 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore.compat;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
@@ -18,16 +19,22 @@ import static org.opendaylight.controller.cluster.datastore.TransactionProxy.Tra
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
 import akka.dispatch.Futures;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
 import akka.dispatch.Futures;
+import akka.util.Timeout;
 import com.google.common.base.Optional;
 import java.util.concurrent.TimeUnit;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import com.google.common.base.Optional;
 import java.util.concurrent.TimeUnit;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy;
 import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
@@ -36,7 +43,9 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -95,12 +104,37 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         return argThat(matcher);
     }
 
         return argThat(matcher);
     }
 
+    private CanCommitTransaction eqCanCommitTransaction(final String transactionID) {
+        ArgumentMatcher<CanCommitTransaction> matcher = new ArgumentMatcher<CanCommitTransaction>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ThreePhaseCommitCohortMessages.CanCommitTransaction.class.equals(argument.getClass()) &&
+                        CanCommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
+    private CommitTransaction eqCommitTransaction(final String transactionID) {
+        ArgumentMatcher<CommitTransaction> matcher = new ArgumentMatcher<CommitTransaction>() {
+            @Override
+            public boolean matches(Object argument) {
+                return ThreePhaseCommitCohortMessages.CommitTransaction.class.equals(argument.getClass()) &&
+                        CommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID);
+            }
+        };
+
+        return argThat(matcher);
+    }
+
     private Future<Object> readySerializedTxReply(String path, short version) {
         return Futures.successful(new ReadyTransactionReply(path, version).toSerializable());
     }
 
     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
     private Future<Object> readySerializedTxReply(String path, short version) {
         return Futures.successful(new ReadyTransactionReply(path, version).toSerializable());
     }
 
     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version,
+                DefaultShardStrategy.DEFAULT_SHARD);
 
         NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
 
         NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -136,13 +170,24 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+        AbstractThreePhaseCommitCohort<?> proxy = transactionProxy.ready();
 
 
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
 
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)),
+                    eqCanCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class));
 
 
-        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+        doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)),
+                   eqCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class));
+
+        Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS);
+        assertEquals("canCommit", true, canCommit.booleanValue());
+
+        proxy.preCommit().get(3, TimeUnit.SECONDS);
+
+        proxy.commit().get(3, TimeUnit.SECONDS);
 
         return actorRef;
     }
 
         return actorRef;
     }
@@ -169,7 +214,8 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
     // creating transaction actors for write-only Tx's.
     public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
         short version = DataStoreVersions.HELIUM_2_VERSION;
     // creating transaction actors for write-only Tx's.
     public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
         short version = DataStoreVersions.HELIUM_2_VERSION;
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version);
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version,
+                DefaultShardStrategy.DEFAULT_SHARD);
 
         NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
 
         NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);