Optimize TransactionProxy for write-only transactions 60/16160/8
authorTom Pantelis <tpanteli@brocade.com>
Sat, 7 Mar 2015 04:40:15 +0000 (23:40 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 24 Mar 2015 17:37:16 +0000 (13:37 -0400)
For write-only transactions, modified TransactionProxy to prepare
the Tx modifications directly on the Shard rather than having the
Shard create a separate ShardTransaction actor. The BatchedModifications
messages are sent directly to the shard. On ready, the last
BatchedModifications is flagged as ready, thus also eliminating the
ReadyTransaction message. In this manner, we eliminate the
CreateTransaction and the ReadyTransaction messages and the
the ShardTransaction actor.

Several new fields were added to the BatchedModifications message:
transactionID, transactionChainID, and ready. On the last, readied
BatchedModifications message, the Shard returns the
BatchedModificationsReply message with the cohort actor (the Shard
itself).

The BatchedModifications messages are handled by the
ShardCommitCoordinator. I started creating a separate
ShardTransactionCoordinator but realized I'd have to duplicate some
storage plus the 2 coordinators should have to interface. The
CohortEntry is used to store the prepared DOMStoreWriteTransaction.
Perhaps ShardCommitCoordinator should be renamed to
ShardTransactionCoordinator but that can be refactored later.

I create a DOMTransactionFactory that is used by both the
ShardCommitCoordinator and the Shard to create DOM transactions and
update the mbean stats.

Change-Id: I0d8126149f80854feb504ed9d8e49bcc7db253ce
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
20 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java
new file mode 100644 (file)
index 0000000..f243620
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.slf4j.Logger;
+
+/**
+ * A factory for creating DOM transactions, either normal or chained.
+ *
+ * @author Thomas Pantelis
+ */
+public class DOMTransactionFactory {
+
+    private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+    private final InMemoryDOMDataStore store;
+    private final ShardStats shardMBean;
+    private final Logger log;
+    private final String name;
+
+    public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) {
+        this.store = store;
+        this.shardMBean = shardMBean;
+        this.log = log;
+        this.name = name;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T extends DOMStoreTransaction> T newTransaction(TransactionProxy.TransactionType type,
+            String transactionID, String transactionChainID) {
+
+        DOMStoreTransactionFactory factory = store;
+
+        if(!transactionChainID.isEmpty()) {
+            factory = transactionChains.get(transactionChainID);
+            if(factory == null) {
+                if(log.isDebugEnabled()) {
+                    log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID,
+                            transactionChainID);
+                }
+
+                DOMStoreTransactionChain transactionChain = store.createTransactionChain();
+                transactionChains.put(transactionChainID, transactionChain);
+                factory = transactionChain;
+            }
+        } else {
+            log.debug("{}: Creating transaction with ID {}", name, transactionID);
+        }
+
+        T transaction = null;
+        switch(type) {
+            case READ_ONLY:
+                transaction = (T) factory.newReadOnlyTransaction();
+                shardMBean.incrementReadOnlyTransactionCount();
+                break;
+            case READ_WRITE:
+                transaction = (T) factory.newReadWriteTransaction();
+                shardMBean.incrementReadWriteTransactionCount();
+                break;
+            case WRITE_ONLY:
+                transaction = (T) factory.newWriteOnlyTransaction();
+                shardMBean.incrementWriteOnlyTransactionCount();
+                break;
+        }
+
+        return transaction;
+    }
+
+    public void closeTransactionChain(String transactionChainID) {
+        DOMStoreTransactionChain chain =
+                transactionChains.remove(transactionChainID);
+
+        if(chain != null) {
+            chain.close();
+        }
+    }
+
+    public void closeAllTransactionChains() {
+        for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
+            entry.getValue().close();
+        }
+
+        transactionChains.clear();
+    }
+}
index 7f8a4e7..d5142c9 100644 (file)
@@ -58,6 +58,7 @@ public class DatastoreContext {
     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
+    private boolean writeOnlyTransactionOptimizationsEnabled = false;
 
     private DatastoreContext() {
         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
@@ -82,6 +83,7 @@ public class DatastoreContext {
         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
         this.dataStoreType = other.dataStoreType;
         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
+        this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
@@ -186,6 +188,10 @@ public class DatastoreContext {
         return shardBatchedModificationCount;
     }
 
+    public boolean isWriteOnlyTransactionOptimizationsEnabled() {
+        return writeOnlyTransactionOptimizationsEnabled;
+    }
+
     public static class Builder {
         private final DatastoreContext datastoreContext;
         private int maxShardDataChangeExecutorPoolSize =
@@ -326,6 +332,11 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
+            datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
+            return this;
+        }
+
         public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
             return this;
index 9ec4f9c..99bc9de 100644 (file)
@@ -43,6 +43,8 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@ -76,8 +78,6 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -105,7 +105,7 @@ public class Shard extends RaftActor {
     private final InMemoryDOMDataStore store;
 
     /// The name of this shard
-    private final ShardIdentifier name;
+    private final String name;
 
     private final ShardStats shardMBean;
 
@@ -142,7 +142,7 @@ public class Shard extends RaftActor {
     private ShardRecoveryCoordinator recoveryCoordinator;
     private List<Object> currentLogRecoveryBatch;
 
-    private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+    private final DOMTransactionFactory transactionFactory;
 
     private final String txnDispatcherPath;
 
@@ -151,7 +151,7 @@ public class Shard extends RaftActor {
         super(name.toString(), mapPeerAddresses(peerAddresses),
                 Optional.of(datastoreContext.getShardRaftConfig()));
 
-        this.name = name;
+        this.name = name.toString();
         this.datastoreContext = datastoreContext;
         this.schemaContext = schemaContext;
         this.dataPersistenceProvider = (datastoreContext.isPersistent())
@@ -178,8 +178,11 @@ public class Shard extends RaftActor {
             getContext().become(new MeteringBehavior(this));
         }
 
-        commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
-                datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
+        transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+
+        commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+                TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+                datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
         setTransactionCommitTimeout();
 
@@ -272,6 +275,8 @@ public class Shard extends RaftActor {
         try {
             if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCreateTransaction(message);
+            } else if (BatchedModifications.class.isInstance(message)) {
+                handleBatchedModifications((BatchedModifications)message);
             } else if (message instanceof ForwardedReadyTransaction) {
                 handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
@@ -451,6 +456,47 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
     }
 
+    private void handleBatchedModifications(BatchedModifications batched) {
+        // This message is sent to prepare the modificationsa transaction directly on the Shard as an
+        // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
+        // BatchedModifications message, the caller sets the ready flag in the message indicating
+        // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
+        // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
+        // ReadyTransaction message.
+
+        // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
+        // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
+        // the primary/leader shard. However with timing and caching on the front-end, there's a small
+        // window where it could have a stale leader during leadership transitions.
+        //
+        if(isLeader()) {
+            try {
+                BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
+                sender().tell(reply, self());
+            } catch (Exception e) {
+                LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+                        batched.getTransactionID(), e);
+                getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+            }
+        } else {
+            ActorSelection leader = getLeader();
+            if(leader != null) {
+                // TODO: what if this is not the first batch and leadership changed in between batched messages?
+                // We could check if the commitCoordinator already has a cached entry and forward all the previous
+                // batched modifications.
+                LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
+                leader.forward(batched, getContext());
+            } else {
+                // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+                // it more resilient in case we're in the process of electing a new leader.
+                getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+                    "Could not find the leader for shard %s. This typically happens" +
+                    " when the system is coming up or recovering and a leader is being elected. Try again" +
+                    " later.", persistenceId()))), getSelf());
+            }
+        }
+    }
+
     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
         LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
                 ready.getTransactionID(), ready.getTxnClientVersion());
@@ -459,7 +505,7 @@ public class Shard extends RaftActor {
         // commitCoordinator in preparation for the subsequent three phase commit initiated by
         // the front-end.
         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
-                ready.getModification());
+                (MutableCompositeModification) ready.getModification());
 
         // Return our actor path as we'll handle the three phase commit, except if the Tx client
         // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
@@ -536,56 +582,18 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        DOMStoreTransactionChain chain =
-            transactionChains.remove(closeTransactionChain.getTransactionChainId());
-
-        if(chain != null) {
-            chain.close();
-        }
+        transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
             ShardTransactionIdentifier transactionId, String transactionChainId,
             short clientVersion ) {
 
-        DOMStoreTransactionFactory factory = store;
-
-        if(!transactionChainId.isEmpty()) {
-            factory = transactionChains.get(transactionChainId);
-            if(factory == null){
-                DOMStoreTransactionChain transactionChain = store.createTransactionChain();
-                transactionChains.put(transactionChainId, transactionChain);
-                factory = transactionChain;
-            }
-        }
-
-        if(this.schemaContext == null) {
-            throw new IllegalStateException("SchemaContext is not set");
-        }
+        DOMStoreTransaction transaction = transactionFactory.newTransaction(
+                TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
+                transactionChainId);
 
-        if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
-
-            shardMBean.incrementWriteOnlyTransactionCount();
-
-            return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
-
-        } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
-
-            shardMBean.incrementReadWriteTransactionCount();
-
-            return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
-
-        } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
-
-            shardMBean.incrementReadOnlyTransactionCount();
-
-            return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
-
-        } else {
-            throw new IllegalArgumentException(
-                "Shard="+name + ":CreateTransaction message has unidentified transaction type="
-                    + transactionType);
-        }
+        return createShardTransaction(transaction, transactionId, clientVersion);
     }
 
     private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
@@ -906,17 +914,14 @@ public class Shard extends RaftActor {
         }
 
         // If this actor is no longer the leader close all the transaction chains
-        if(!isLeader){
-            for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug(
-                        "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
-                        persistenceId(), entry.getKey(), getId());
-                }
-                entry.getValue().close();
+        if(!isLeader) {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
+                    persistenceId(), getId());
             }
 
-            transactionChains.clear();
+            transactionFactory.closeAllTransactionChains();
         }
     }
 
@@ -926,7 +931,7 @@ public class Shard extends RaftActor {
     }
 
     @Override public String persistenceId() {
-        return this.name.toString();
+        return this.name;
     }
 
     @VisibleForTesting
@@ -934,6 +939,12 @@ public class Shard extends RaftActor {
         return dataPersistenceProvider;
     }
 
+    @VisibleForTesting
+    ShardCommitCoordinator getCommitCoordinator() {
+        return commitCoordinator;
+    }
+
+
     private static class ShardCreator implements Creator<Shard> {
 
         private static final long serialVersionUID = 1L;
index 5d0ca38..54f15fc 100644 (file)
@@ -9,17 +9,26 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.slf4j.Logger;
 
 /**
@@ -29,10 +38,17 @@ import org.slf4j.Logger;
  */
 public class ShardCommitCoordinator {
 
+    // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+    public interface CohortDecorator {
+        DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
+    }
+
     private final Cache<String, CohortEntry> cohortCache;
 
     private CohortEntry currentCohortEntry;
 
+    private final DOMTransactionFactory transactionFactory;
+
     private final Queue<CohortEntry> queuedCohortEntries;
 
     private int queueCapacity;
@@ -41,14 +57,33 @@ public class ShardCommitCoordinator {
 
     private final String name;
 
-    public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
-            String name) {
-        cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
-                cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
+    private final String shardActorPath;
+
+    private final RemovalListener<String, CohortEntry> cacheRemovalListener =
+            new RemovalListener<String, CohortEntry>() {
+                @Override
+                public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
+                    if(notification.getCause() == RemovalCause.EXPIRED) {
+                        log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
+                    }
+                }
+            };
+
+    // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+    private CohortDecorator cohortDecorator;
+
+    public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
+            long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
 
         this.queueCapacity = queueCapacity;
         this.log = log;
         this.name = name;
+        this.transactionFactory = transactionFactory;
+
+        shardActorPath = Serialization.serializedActorPath(shardActor);
+
+        cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
+                removalListener(cacheRemovalListener).build();
 
         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
         // since this should only be accessed on the shard's dispatcher.
@@ -60,19 +95,62 @@ public class ShardCommitCoordinator {
     }
 
     /**
-     * This method caches a cohort entry for the given transactions ID in preparation for the
-     * subsequent 3-phase commit.
+     * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
+     * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
      *
      * @param transactionID the ID of the transaction
      * @param cohort the cohort to participate in the transaction commit
-     * @param modification the modification made by the transaction
+     * @param modification the modifications made by the transaction
      */
     public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
-            Modification modification) {
+            MutableCompositeModification modification) {
 
         cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
     }
 
+    /**
+     * This method handles a BatchedModifications message for a transaction being prepared directly on the
+     * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
+     * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
+     * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
+     *
+     * @param batched the BatchedModifications
+     * @param shardActor the transaction's shard actor
+     *
+     * @throws ExecutionException if an error occurs loading the cache
+     */
+    public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched)
+            throws ExecutionException {
+        CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
+        if(cohortEntry == null) {
+            cohortEntry = new CohortEntry(batched.getTransactionID(),
+                    transactionFactory.<DOMStoreWriteTransaction>newTransaction(
+                        TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
+                        batched.getTransactionChainID()));
+            cohortCache.put(batched.getTransactionID(), cohortEntry);
+        }
+
+        if(log.isDebugEnabled()) {
+            log.debug("{}: Applying {} batched modifications for Tx {}", name,
+                    batched.getModifications().size(), batched.getTransactionID());
+        }
+
+        cohortEntry.applyModifications(batched.getModifications());
+
+        String cohortPath = null;
+        if(batched.isReady()) {
+            if(log.isDebugEnabled()) {
+                log.debug("{}: Readying Tx {}, client version {}", name,
+                        batched.getTransactionID(), batched.getVersion());
+            }
+
+            cohortEntry.ready(cohortDecorator);
+            cohortPath = shardActorPath;
+        }
+
+        return new BatchedModificationsReply(batched.getModifications().size(), cohortPath);
+    }
+
     /**
      * This method handles the canCommit phase for a transaction.
      *
@@ -216,19 +294,33 @@ public class ShardCommitCoordinator {
         }
     }
 
+    @VisibleForTesting
+    void setCohortDecorator(CohortDecorator cohortDecorator) {
+        this.cohortDecorator = cohortDecorator;
+    }
+
+
     static class CohortEntry {
         private final String transactionID;
-        private final DOMStoreThreePhaseCommitCohort cohort;
-        private final Modification modification;
+        private DOMStoreThreePhaseCommitCohort cohort;
+        private final MutableCompositeModification compositeModification;
+        private final DOMStoreWriteTransaction transaction;
         private ActorRef canCommitSender;
         private ActorRef shard;
         private long lastAccessTime;
 
+        CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
+            this.compositeModification = new MutableCompositeModification();
+            this.transaction = transaction;
+            this.transactionID = transactionID;
+        }
+
         CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
-                Modification modification) {
+                MutableCompositeModification compositeModification) {
             this.transactionID = transactionID;
             this.cohort = cohort;
-            this.modification = modification;
+            this.compositeModification = compositeModification;
+            this.transaction = null;
         }
 
         void updateLastAccessTime() {
@@ -247,8 +339,26 @@ public class ShardCommitCoordinator {
             return cohort;
         }
 
-        Modification getModification() {
-            return modification;
+        MutableCompositeModification getModification() {
+            return compositeModification;
+        }
+
+        void applyModifications(Iterable<Modification> modifications) {
+            for(Modification modification: modifications) {
+                compositeModification.addModification(modification);
+                modification.apply(transaction);
+            }
+        }
+
+        void ready(CohortDecorator cohortDecorator) {
+            Preconditions.checkState(cohort == null, "cohort was already set");
+
+            cohort = transaction.ready();
+
+            if(cohortDecorator != null) {
+                // Call the hook for unit tests.
+                cohort = cohortDecorator.decorate(transactionID, cohort);
+            }
         }
 
         ActorRef getCanCommitSender() {
@@ -268,10 +378,7 @@ public class ShardCommitCoordinator {
         }
 
         boolean hasModifications(){
-            if(modification instanceof CompositeModification){
-                return ((CompositeModification) modification).getModifications().size() > 0;
-            }
-            return true;
+            return compositeModification.getModifications().size() > 0;
         }
     }
 }
index c1f9c78..3a20963 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
@@ -40,8 +41,8 @@ import scala.concurrent.Future;
 public class TransactionContextImpl extends AbstractTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
+    private final String transactionChainId;
     private final ActorContext actorContext;
-    private final String transactionPath;
     private final ActorSelection actor;
     private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
@@ -49,12 +50,12 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     private final OperationCompleter operationCompleter;
     private BatchedModifications batchedModifications;
 
-    protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
-            ActorContext actorContext, SchemaContext schemaContext,
-            boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
+    protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+            String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+            short remoteTransactionVersion, OperationCompleter operationCompleter) {
         super(identifier);
-        this.transactionPath = transactionPath;
         this.actor = actor;
+        this.transactionChainId = transactionChainId;
         this.actorContext = actorContext;
         this.isTxActorLocal = isTxActorLocal;
         this.remoteTransactionVersion = remoteTransactionVersion;
@@ -71,6 +72,10 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         return actor;
     }
 
+    protected ActorContext getActorContext() {
+        return actorContext;
+    }
+
     protected short getRemoteTransactionVersion() {
         return remoteTransactionVersion;
     }
@@ -93,21 +98,24 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
         // Send the remaining batched modifications if any.
 
-        sendBatchedModifications();
+        sendAndRecordBatchedModifications();
 
         // Send the ReadyTransaction message to the Tx actor.
 
-        final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+        Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
+        return combineRecordedOperationsFutures(readyReplyFuture);
+    }
+
+    protected Future<ActorSelection> combineRecordedOperationsFutures(final Future<Object> withLastReplyFuture) {
         // Combine all the previously recorded put/merge/delete operation reply Futures and the
         // ReadyTransactionReply Future into one Future. If any one fails then the combined
         // Future will fail. We need all prior operations and the ready operation to succeed
         // in order to attempt commit.
 
-        List<Future<Object>> futureList =
-                Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
+        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
         futureList.addAll(recordedOperationFutures);
-        futureList.add(replyFuture);
+        futureList.add(withLastReplyFuture);
 
         Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
                 actorContext.getClientDispatcher());
@@ -129,28 +137,15 @@ public class TransactionContextImpl extends AbstractTransactionContext {
                 // de-serializing each reply.
 
                 // Note the Future get call here won't block as it's complete.
-                Object serializedReadyReply = replyFuture.value().get().get();
+                Object serializedReadyReply = withLastReplyFuture.value().get().get();
                 if (serializedReadyReply instanceof ReadyTransactionReply) {
                     return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
-
+                } else if(serializedReadyReply instanceof BatchedModificationsReply) {
+                    return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
                 } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                     ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                    String cohortPath = reply.getCohortPath();
-
-                    // In Helium we used to return the local path of the actor which represented
-                    // a remote ThreePhaseCommitCohort. The local path would then be converted to
-                    // a remote path using this resolvePath method. To maintain compatibility with
-                    // a Helium node we need to continue to do this conversion.
-                    // At some point in the future when upgrades from Helium are not supported
-                    // we could remove this code to resolvePath and just use the cohortPath as the
-                    // resolved cohortPath
-                    if(TransactionContextImpl.this.remoteTransactionVersion <
-                            DataStoreVersions.HELIUM_1_VERSION) {
-                        cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
-                    }
-
+                    String cohortPath = deserializeCohortPath(reply.getCohortPath());
                     return actorContext.actorSelection(cohortPath);
-
                 } else {
                     // Throwing an exception here will fail the Future.
                     throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
@@ -160,27 +155,51 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
+    protected String deserializeCohortPath(String cohortPath) {
+        return cohortPath;
+    }
+
     private void batchModification(Modification modification) {
         if(batchedModifications == null) {
-            batchedModifications = new BatchedModifications(remoteTransactionVersion);
+            batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+                    transactionChainId);
         }
 
         batchedModifications.addModification(modification);
 
         if(batchedModifications.getModifications().size() >=
                 actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
-            sendBatchedModifications();
+            sendAndRecordBatchedModifications();
         }
     }
 
-    private void sendBatchedModifications() {
+    private void sendAndRecordBatchedModifications() {
+        Future<Object> sentFuture = sendBatchedModifications();
+        if(sentFuture != null) {
+            recordedOperationFutures.add(sentFuture);
+        }
+    }
+
+    protected Future<Object> sendBatchedModifications() {
+        return sendBatchedModifications(false);
+    }
+
+    protected Future<Object> sendBatchedModifications(boolean ready) {
+        Future<Object> sent = null;
         if(batchedModifications != null) {
-            LOG.debug("Tx {} sending {} batched modifications", identifier,
-                    batchedModifications.getModifications().size());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier,
+                        batchedModifications.getModifications().size(), ready);
+            }
 
-            recordedOperationFutures.add(executeOperationAsync(batchedModifications));
-            batchedModifications = null;
+            batchedModifications.setReady(ready);
+            sent = executeOperationAsync(batchedModifications);
+
+            batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+                    transactionChainId);
         }
+
+        return sent;
     }
 
     @Override
@@ -212,7 +231,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
         // Send the remaining batched modifications if any.
 
-        sendBatchedModifications();
+        sendAndRecordBatchedModifications();
 
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
@@ -297,7 +316,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
         // Send the remaining batched modifications if any.
 
-        sendBatchedModifications();
+        sendAndRecordBatchedModifications();
 
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
index 64b9086..7f2f328 100644 (file)
@@ -70,7 +70,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     public static enum TransactionType {
         READ_ONLY,
         WRITE_ONLY,
-        READ_WRITE
+        READ_WRITE;
+
+        public static TransactionType fromInt(int type) {
+            if(type == WRITE_ONLY.ordinal()) {
+                return WRITE_ONLY;
+            } else if(type == READ_WRITE.ordinal()) {
+                return READ_WRITE;
+            } else if(type == READ_ONLY.ordinal()) {
+                return READ_ONLY;
+            } else {
+                throw new IllegalArgumentException("In TransactionType enum value" + type);
+            }
+        }
     }
 
     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
@@ -550,10 +562,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Sets the target primary shard and initiates a CreateTransaction try.
          */
         void setPrimaryShard(ActorSelection primaryShard) {
-            LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
-
             this.primaryShard = primaryShard;
-            tryCreateTransaction();
+
+            if(transactionType == TransactionType.WRITE_ONLY &&
+                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+                LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier);
+
+                // For write-only Tx's we prepare the transaction modifications directly on the shard actor
+                // to avoid the overhead of creating a separate transaction actor.
+                // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
+                executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
+                        this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+            } else {
+                tryCreateTransaction();
+            }
         }
 
         /**
@@ -563,7 +585,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             boolean invokeOperation = true;
             synchronized(txOperationsOnComplete) {
                 if(transactionContext == null) {
-                    LOG.debug("Tx {} Adding operation on complete {}", identifier);
+                    LOG.debug("Tx {} Adding operation on complete", identifier);
 
                     invokeOperation = false;
                     txOperationsOnComplete.add(operation);
@@ -590,6 +612,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
+            LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
+
             Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
                     TransactionProxy.this.transactionType.ordinal(),
                     getTransactionChainId()).toSerializable();
@@ -636,8 +660,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // TransactionContext until after we've executed all cached TransactionOperations.
             TransactionContext localTransactionContext;
             if(failure != null) {
-                LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
-                        failure.getMessage());
+                LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure);
 
                 localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
             } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
@@ -687,11 +710,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
-            String transactionPath = reply.getTransactionPath();
-
             LOG.debug("Tx {} Received {}", identifier, reply);
 
-            ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+            return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
+                    reply.getTransactionPath(), reply.getVersion());
+        }
+
+        private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
+                String transactionPath, short remoteTransactionVersion) {
 
             if (transactionType == TransactionType.READ_ONLY) {
                 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
@@ -720,12 +746,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
-            if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
-                return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                    actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
-            } else {
+            if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
-                        actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+                        transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+                        operationCompleter);
+            } else if (transactionType == TransactionType.WRITE_ONLY &&
+                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+                return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId,
+                    actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+            } else {
+                return new TransactionContextImpl(transactionActor, identifier, transactionChainId,
+                        actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             }
         }
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java
new file mode 100644 (file)
index 0000000..3b4a190
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * Context for a write-only transaction.
+ *
+ * @author Thomas Pantelis
+ */
+public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
+    private static final Logger LOG = LoggerFactory.getLogger(WriteOnlyTransactionContextImpl.class);
+
+    public WriteOnlyTransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+            String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+            short remoteTransactionVersion, OperationCompleter operationCompleter) {
+        super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
+                remoteTransactionVersion, operationCompleter);
+    }
+
+    @Override
+    public Future<ActorSelection> readyTransaction() {
+        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+                identifier, recordedOperationFutures.size());
+
+        // Send the remaining batched modifications if any.
+
+        Future<Object> lastModificationsFuture = sendBatchedModifications(true);
+
+        return combineRecordedOperationsFutures(lastModificationsFuture);
+    }
+}
index e407c7c..ccfb329 100644 (file)
@@ -8,16 +8,21 @@
 package org.opendaylight.controller.cluster.datastore.compat;
 
 import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.OperationCompleter;
 import org.opendaylight.controller.cluster.datastore.TransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 
 /**
  * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't
@@ -26,12 +31,16 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * @author Thomas Pantelis
  */
 public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
+    private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class);
+
+    private final String transactionPath;
 
     public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
-            ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+            String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationCompleter operationCompleter) {
-        super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
-                remoteTransactionVersion,  operationCompleter);
+        super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
+                remoteTransactionVersion, operationCompleter);
+        this.transactionPath = transactionPath;
     }
 
     @Override
@@ -51,4 +60,32 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
         recordedOperationFutures.add(executeOperationAsync(
                 new WriteData(path, data, getRemoteTransactionVersion())));
     }
+
+    @Override
+    public Future<ActorSelection> readyTransaction() {
+        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+                identifier, recordedOperationFutures.size());
+
+        // Send the ReadyTransaction message to the Tx actor.
+
+        Future<Object> lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+
+        return combineRecordedOperationsFutures(lastReplyFuture);
+    }
+
+    @Override
+    protected String deserializeCohortPath(String cohortPath) {
+        // In base Helium we used to return the local path of the actor which represented
+        // a remote ThreePhaseCommitCohort. The local path would then be converted to
+        // a remote path using this resolvePath method. To maintain compatibility with
+        // a Helium node we need to continue to do this conversion.
+        // At some point in the future when upgrades from Helium are not supported
+        // we could remove this code to resolvePath and just use the cohortPath as the
+        // resolved cohortPath
+        if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+            return getActorContext().resolvePath(transactionPath, cohortPath);
+        }
+
+        return cohortPath;
+    }
 }
index d1f9495..fa1525c 100644 (file)
@@ -12,10 +12,14 @@ import com.google.common.base.Preconditions;
 
 public class ShardTransactionIdentifier {
     private final String remoteTransactionId;
+    private final String stringRepresentation;
 
     public ShardTransactionIdentifier(String remoteTransactionId) {
         this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId,
                 "remoteTransactionId should not be null");
+
+        stringRepresentation = new StringBuilder(remoteTransactionId.length() + 6).append("shard-").
+                append(remoteTransactionId).toString();
     }
 
     public String getRemoteTransactionId() {
@@ -46,9 +50,7 @@ public class ShardTransactionIdentifier {
     }
 
     @Override public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("shard-").append(remoteTransactionId);
-        return sb.toString();
+        return stringRepresentation;
     }
 
 }
index 670641f..a9ce94b 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 
 /**
@@ -17,15 +21,61 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi
 public class BatchedModifications extends MutableCompositeModification implements SerializableMessage {
     private static final long serialVersionUID = 1L;
 
+    private boolean ready;
+    private String transactionID;
+    private String transactionChainID;
+
     public BatchedModifications() {
     }
 
-    public BatchedModifications(short version) {
+    public BatchedModifications(String transactionID, short version, String transactionChainID) {
         super(version);
+        this.transactionID = Preconditions.checkNotNull(transactionID, "transactionID can't be null");
+        this.transactionChainID = transactionChainID != null ? transactionChainID : "";
+    }
+
+    public boolean isReady() {
+        return ready;
+    }
+
+    public void setReady(boolean ready) {
+        this.ready = ready;
+    }
+
+    public String getTransactionID() {
+        return transactionID;
+    }
+
+    public String getTransactionChainID() {
+        return transactionChainID;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        transactionID = in.readUTF();
+        transactionChainID = in.readUTF();
+        ready = in.readBoolean();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeUTF(transactionID);
+        out.writeUTF(transactionChainID);
+        out.writeBoolean(ready);
     }
 
     @Override
     public Object toSerializable() {
         return this;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
+                .append(", modifications size=").append(getModifications().size()).append("]");
+        return builder.toString();
+    }
 }
index 33c5733..a10c6ac 100644 (file)
@@ -19,7 +19,11 @@ import java.io.ObjectOutput;
 public class BatchedModificationsReply extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
+    private static final byte COHORT_PATH_NOT_PRESENT = 0;
+    private static final byte COHORT_PATH_PRESENT = 1;
+
     private int numBatched;
+    private String cohortPath;
 
     public BatchedModificationsReply() {
     }
@@ -28,25 +32,52 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage {
         this.numBatched = numBatched;
     }
 
+    public BatchedModificationsReply(int numBatched, String cohortPath) {
+        this.numBatched = numBatched;
+        this.cohortPath = cohortPath;
+    }
 
     public int getNumBatched() {
         return numBatched;
     }
 
+    public String getCohortPath() {
+        return cohortPath;
+    }
+
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
         numBatched = in.readInt();
+
+        if(in.readByte() == COHORT_PATH_PRESENT) {
+            cohortPath = in.readUTF();
+        }
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
         super.writeExternal(out);
         out.writeInt(numBatched);
+
+        if(cohortPath != null) {
+            out.writeByte(COHORT_PATH_PRESENT);
+            out.writeUTF(cohortPath);
+        } else {
+            out.writeByte(COHORT_PATH_NOT_PRESENT);
+        }
     }
 
     @Override
     public Object toSerializable() {
         return this;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=")
+                .append(cohortPath).append("]");
+        return builder.toString();
+    }
 }
index 2a660fa..b34737b 100644 (file)
@@ -11,6 +11,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 
 /**
  * Abstract base class for a versioned Externalizable message.
@@ -20,7 +21,7 @@ import java.io.ObjectOutput;
 public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage {
     private static final long serialVersionUID = 1L;
 
-    private short version;
+    private short version = DataStoreVersions.CURRENT_VERSION;
 
     public VersionedExternalizableMessage() {
     }
index 4896b05..c6c5486 100644 (file)
@@ -94,8 +94,7 @@ public abstract class AbstractTransactionProxyTest {
 
     protected final String memberName = "mock-member";
 
-    protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
-            shardBatchedModificationCount(1);
+    protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2);
 
     @BeforeClass
     public static void setUpClass() throws IOException {
@@ -251,6 +250,13 @@ public abstract class AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
+    protected void expectBatchedModificationsReady(ActorRef actorRef, int count) {
+        Future<BatchedModificationsReply> replyFuture = Futures.successful(
+                new BatchedModificationsReply(count, actorRef.path().toString()));
+        doReturn(replyFuture).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+    }
+
     protected void expectBatchedModifications(int count) {
         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
                 any(ActorSelection.class), isA(BatchedModifications.class));
@@ -307,15 +313,21 @@ public abstract class AbstractTransactionProxyTest {
     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
             TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) {
 
-        ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-        log.info("Created mock shard Tx actor {}", txActorRef);
+        ActorRef txActorRef;
+        if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+                dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) {
+            txActorRef = shardActorRef;
+        } else {
+            txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+            log.info("Created mock shard Tx actor {}", txActorRef);
 
-        doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection(
-                txActorRef.path().toString());
+            doReturn(actorSystem.actorSelection(txActorRef.path())).
+            when(mockActorContext).actorSelection(txActorRef.path().toString());
 
-        doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
-                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                        eqCreateTransaction(prefix, type));
+            doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
+            executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                    eqCreateTransaction(prefix, type));
+        }
 
         return txActorRef;
     }
@@ -358,17 +370,18 @@ public abstract class AbstractTransactionProxyTest {
         return captured;
     }
 
-    protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+    protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected, boolean expIsReady) {
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
-        verifyBatchedModifications(batchedModifications.get(0), expected);
+        verifyBatchedModifications(batchedModifications.get(0), expIsReady, expected);
     }
 
-    protected void verifyBatchedModifications(Object message, Modification... expected) {
+    protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) {
         assertEquals("Message type", BatchedModifications.class, message.getClass());
         BatchedModifications batchedModifications = (BatchedModifications)message;
         assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+        assertEquals("isReady", expIsReady, batchedModifications.isReady());
         for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
             Modification actual = batchedModifications.getModifications().get(i);
             assertEquals("Modification type", expected[i].getClass(), actual.getClass());
index 54a9e2d..a3c5eb4 100644 (file)
@@ -163,7 +163,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // Create the write Tx
 
-            final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+            // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard.
+            final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
             assertNotNull("newReadWriteTransaction returned null", writeTx);
 
             // Do some modification operations and ready the Tx on a separate thread.
@@ -473,7 +474,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // Create the write Tx.
 
-            final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+            final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
             assertNotNull("newReadWriteTransaction returned null", writeTx);
 
             // Do some modifications and ready the Tx on a separate thread.
index d888d62..0fbe686 100644 (file)
@@ -10,6 +10,7 @@ import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.dispatch.Dispatchers;
@@ -35,6 +36,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
@@ -42,6 +44,8 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@ -49,6 +53,8 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionR
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
@@ -89,6 +95,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -98,6 +105,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 public class ShardTest extends AbstractShardTest {
+
     @Test
     public void testRegisterChangeListener() throws Exception {
         new ShardTestKit(getSystem()) {{
@@ -424,42 +432,42 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
-
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
-            String transactionID1 = "tx1";
-            MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+            final String transactionID1 = "tx1";
+            final String transactionID2 = "tx2";
+            final String transactionID3 = "tx3";
 
-            String transactionID2 = "tx2";
-            MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
-                    TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
-                    modification2);
+            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
+            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
+            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
+            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+                @Override
+                public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
+                    if(transactionID.equals(transactionID1)) {
+                        mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
+                        return mockCohort1.get();
+                    } else if(transactionID.equals(transactionID2)) {
+                        mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
+                        return mockCohort2.get();
+                    } else {
+                        mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
+                        return mockCohort3.get();
+                    }
+                }
+            };
 
-            String transactionID3 = "tx3";
-            MutableCompositeModification modification3 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
-                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
-                    modification3);
+            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
 
             long timeoutSec = 5;
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
-            // by the ShardTransaction.
+            // Send a BatchedModifications message for the first transaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
-            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
-                    expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
-            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
+            assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
+            assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -468,15 +476,16 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send the ForwardedReadyTransaction for the next 2 Tx's.
+            // Send BatchedModifications for the next 2 Tx's.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
+                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
             // processed after the first Tx completes.
@@ -569,16 +578,16 @@ public class ShardTest extends AbstractShardTest {
 
             assertEquals("Commits complete", true, done);
 
-            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
-            inOrder.verify(cohort1).canCommit();
-            inOrder.verify(cohort1).preCommit();
-            inOrder.verify(cohort1).commit();
-            inOrder.verify(cohort2).canCommit();
-            inOrder.verify(cohort2).preCommit();
-            inOrder.verify(cohort2).commit();
-            inOrder.verify(cohort3).canCommit();
-            inOrder.verify(cohort3).preCommit();
-            inOrder.verify(cohort3).commit();
+            InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
+            inOrder.verify(mockCohort1.get()).canCommit();
+            inOrder.verify(mockCohort1.get()).preCommit();
+            inOrder.verify(mockCohort1.get()).commit();
+            inOrder.verify(mockCohort2.get()).canCommit();
+            inOrder.verify(mockCohort2.get()).preCommit();
+            inOrder.verify(mockCohort2.get()).commit();
+            inOrder.verify(mockCohort3.get()).canCommit();
+            inOrder.verify(mockCohort3.get()).preCommit();
+            inOrder.verify(mockCohort3.get()).commit();
 
             // Verify data in the data store.
 
@@ -601,34 +610,62 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
+            NormalizedNode<?, ?> data, boolean ready) {
+        return newBatchedModifications(transactionID, null, path, data, ready);
+    }
+
+    private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
+            YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
+        BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
+        batched.addModification(new WriteModification(path, data));
+        batched.setReady(ready);
+        return batched;
+    }
+
+    @SuppressWarnings("unchecked")
     @Test
-    public void testCommitWithPersistenceDisabled() throws Throwable {
-        dataStoreContextBuilder.persistent(false);
+    public void testMultipleBatchedModifications() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testCommitPhaseFailure");
+                    "testMultipleBatchedModifications");
 
             waitUntilLeader(shard);
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            final String transactionID = "tx";
+            FiniteDuration duration = duration("5 seconds");
 
-            // Setup a simulated transactions with a mock cohort.
+            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+                @Override
+                public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+                    if(mockCohort.get() == null) {
+                        mockCohort.set(createDelegatingMockCohort("cohort", actual));
+                    }
 
-            String transactionID = "tx";
-            MutableCompositeModification modification = new MutableCompositeModification();
-            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
-                    TestModel.TEST_PATH, containerNode, modification);
+                    return mockCohort.get();
+                }
+            };
 
-            FiniteDuration duration = duration("5 seconds");
+            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+            // Send a BatchedModifications to start a transaction.
+
+            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
+
+            // Send a couple more BatchedModifications.
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
+            shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
+                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -642,10 +679,153 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
-            InOrder inOrder = inOrder(cohort);
-            inOrder.verify(cohort).canCommit();
-            inOrder.verify(cohort).preCommit();
-            inOrder.verify(cohort).commit();
+            InOrder inOrder = inOrder(mockCohort.get());
+            inOrder.verify(mockCohort.get()).canCommit();
+            inOrder.verify(mockCohort.get()).preCommit();
+            inOrder.verify(mockCohort.get()).commit();
+
+            // Verify data in the data store.
+
+            NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
+            assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+                    outerList.getValue() instanceof Iterable);
+            Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
+            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+                       entry instanceof MapEntryNode);
+            MapEntryNode mapEntry = (MapEntryNode)entry;
+            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+            assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testBatchedModificationsOnTransactionChain() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testBatchedModificationsOnTransactionChain");
+
+            waitUntilLeader(shard);
+
+            String transactionChainID = "txChain";
+            String transactionID1 = "tx1";
+            String transactionID2 = "tx2";
+
+            FiniteDuration duration = duration("5 seconds");
+
+            // Send a BatchedModifications to start a chained write transaction and ready it.
+
+            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            YangInstanceIdentifier path = TestModel.TEST_PATH;
+            shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
+                    containerNode, true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
+
+            // Create a read Tx on the same chain.
+
+            shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
+                    transactionChainID).toSerializable(), getRef());
+
+            CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
+
+            getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
+            ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
+            assertEquals("Read node", containerNode, readReply.getNormalizedNode());
+
+            // Commit the write transaction.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Verify data in the data store.
+
+            NormalizedNode<?, ?> actualNode = readStore(shard, path);
+            assertEquals("Stored node", containerNode, actualNode);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testOnBatchedModificationsWhenNotLeader() {
+        final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
+        new ShardTestKit(getSystem()) {{
+            Creator<Shard> creator = new Creator<Shard>() {
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                            newDatastoreContext(), SCHEMA_CONTEXT) {
+                        @Override
+                        protected boolean isLeader() {
+                            return overrideLeaderCalls.get() ? false : super.isLeader();
+                        }
+
+                        @Override
+                        protected ActorSelection getLeader() {
+                            return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
+                                super.getLeader();
+                        }
+                    };
+                }
+            };
+
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
+
+            waitUntilLeader(shard);
+
+            overrideLeaderCalls.set(true);
+
+            BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
+
+            shard.tell(batched, ActorRef.noSender());
+
+            expectMsgEquals(batched);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testCommitWithPersistenceDisabled() throws Throwable {
+        dataStoreContextBuilder.persistent(false);
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testCommitWithPersistenceDisabled");
+
+            waitUntilLeader(shard);
+
+            String transactionID = "tx";
+            FiniteDuration duration = duration("5 seconds");
+
+            // Send a BatchedModifications to start a transaction.
+
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
+
+            // Send the CanCommitTransaction message.
+
+            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+            // Send the CanCommitTransaction message.
+
+            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
@@ -774,34 +954,40 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            // Setup 2 simulated transactions with mock cohorts. The first one fails in the
-            // commit phase.
+            // Setup 2 mock cohorts. The first one fails in the commit phase.
 
-            String transactionID1 = "tx1";
-            MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            final String transactionID1 = "tx1";
+            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
 
-            String transactionID2 = "tx2";
-            MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            final String transactionID2 = "tx2";
+            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
+            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+                @Override
+                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+                        DOMStoreThreePhaseCommitCohort actual) {
+                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
+                }
+            };
+
+            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
+            // Send BatchedModifications to start and ready each transaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -854,19 +1040,27 @@ public class ShardTest extends AbstractShardTest {
             waitUntilLeader(shard);
 
             String transactionID = "tx1";
-            MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
 
+            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+                @Override
+                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+                        DOMStoreThreePhaseCommitCohort actual) {
+                    return cohort;
+                }
+            };
+
+            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
             FiniteDuration duration = duration("5 seconds");
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
+            // Send BatchedModifications to start and ready a transaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -901,16 +1095,24 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             String transactionID = "tx1";
-            MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
+            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+                @Override
+                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+                        DOMStoreThreePhaseCommitCohort actual) {
+                    return cohort;
+                }
+            };
+
+            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+            // Send BatchedModifications to start and ready a transaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -954,14 +1156,9 @@ public class ShardTest extends AbstractShardTest {
                 }
             };
 
-            MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
-                    modification, preCommit);
-
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
-                    cohort, modification, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
@@ -995,42 +1192,26 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-            // Create 1st Tx - will timeout
+            // Create and ready the 1st Tx - will timeout
 
             String transactionID1 = "tx1";
-            MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
-                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
-                    modification1);
+            shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
+                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
-            // Create 2nd Tx
+            // Create and ready the 2nd Tx
 
-            String transactionID2 = "tx3";
-            MutableCompositeModification modification2 = new MutableCompositeModification();
+            String transactionID2 = "tx2";
             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
-                    listNodePath,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
-                    modification2);
-
-            // Ready the Tx's
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
-
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+            shard.tell(newBatchedModifications(transactionID2, listNodePath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
@@ -1067,38 +1248,23 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
             String transactionID1 = "tx1";
-            MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
             String transactionID2 = "tx2";
-            MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
-                    TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
-                    modification2);
-
             String transactionID3 = "tx3";
-            MutableCompositeModification modification3 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
 
-            // Ready the Tx's
+            // Send a BatchedModifications to start transactions and ready them.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
             // canCommit 1st Tx.
 
@@ -1143,30 +1309,37 @@ public class ShardTest extends AbstractShardTest {
 
             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
 
-            String transactionID1 = "tx1";
-            MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            final String transactionID1 = "tx1";
+            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
 
-            String transactionID2 = "tx2";
-            MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            final String transactionID2 = "tx2";
+            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Simulate the ForwardedReadyTransaction messages that would be sent
-            // by the ShardTransaction.
+            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+                @Override
+                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+                        DOMStoreThreePhaseCommitCohort actual) {
+                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
+                }
+            };
+
+            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            // Send BatchedModifications to start and ready each transaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
+
+            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+            expectMsgClass(duration, BatchedModificationsReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -1380,6 +1553,7 @@ public class ShardTest extends AbstractShardTest {
             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
         }};
+
     }
 
     @Test
index c6b5cb4..8ebb145 100644 (file)
@@ -372,7 +372,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
 
-            BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
             batched.addModification(new WriteModification(writePath, writeData));
             batched.addModification(new MergeModification(mergePath, mergeData));
             batched.addModification(new DeleteModification(deletePath));
index 4f00ed5..acba775 100644 (file)
@@ -29,8 +29,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@ -111,6 +114,74 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
         verify(mockActorContext, times(0)).acquireTxCreationPermit();
     }
 
+    /**
+     * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
+     * initiated until the first one completes its read future.
+     */
+    @Test
+    public void testChainedWriteOnlyTransactions() throws Exception {
+        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+        Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
+        doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
+
+        DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
+
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+        writeTx1.ready();
+
+        verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
+
+        ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+        expectBatchedModifications(txActorRef2, 1);
+
+        final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
+
+        final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+        final CountDownLatch write2Complete = new CountDownLatch(1);
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
+                } catch (Exception e) {
+                    caughtEx.set(e);
+                } finally {
+                    write2Complete.countDown();
+                }
+            }
+        }.start();
+
+        assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
+
+        if(caughtEx.get() != null) {
+            throw caughtEx.get();
+        }
+
+        try {
+            verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } catch (AssertionError e) {
+            fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
+        }
+
+        batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
+
+        // Tx 2 should've proceeded to find the primary shard.
+        verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+    }
+
     /**
      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
      * initiated until the first one completes its read future.
@@ -134,7 +205,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         writeTx1.ready();
 
-        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1));
+        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
 
         String tx2MemberName = "tx2MemberName";
         doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
index 8278d3c..ac2c079 100644 (file)
@@ -9,6 +9,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
@@ -31,6 +32,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -384,24 +386,18 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWrite() throws Exception {
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         expectBatchedModifications(actorRef, 1);
-        expectReadyTransaction(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        // This sends the batched modification.
-        transactionProxy.ready();
-
-        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
     }
 
     @Test
@@ -456,7 +452,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         // This sends the batched modification.
         transactionProxy.ready();
 
-        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 BatchedModificationsReply.class);
@@ -479,48 +475,36 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMerge() throws Exception {
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         expectBatchedModifications(actorRef, 1);
-        expectReadyTransaction(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
-        // This sends the batched modification.
-        transactionProxy.ready();
-
-        verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
+        verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
     }
 
     @Test
     public void testDelete() throws Exception {
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
         expectBatchedModifications(actorRef, 1);
-        expectReadyTransaction(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
-        // This sends the batched modification.
-        transactionProxy.ready();
-
-        verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
+        verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
     }
 
     @Test
-    public void testReady() throws Exception {
+    public void testReadyWithReadWrite() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
@@ -550,18 +534,91 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
                 isA(BatchedModifications.class));
+
+        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
+                isA(ReadyTransaction.SERIALIZABLE_CLASS));
+    }
+
+    @Test
+    public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
+        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        expectBatchedModificationsReady(actorRef, 1);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
+
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), true,
+                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+        verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
+                isA(ReadyTransaction.SERIALIZABLE_CLASS));
+    }
+
+    @Test
+    public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
+        dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        expectBatchedModificationsReady(actorRef, 1);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                BatchedModificationsReply.class);
+
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), false,
+                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+        verifyBatchedModifications(batchedModifications.get(1), true);
+
+        verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
+                isA(ReadyTransaction.SERIALIZABLE_CLASS));
     }
 
     @Test
     public void testReadyWithRecordingOperationFailure() throws Exception {
+        dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
+
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         expectFailedBatchedModifications(actorRef);
 
-        expectReadyTransaction(actorRef);
-
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
@@ -581,15 +638,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testReadyWithReplyFailure() throws Exception {
+        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        expectBatchedModifications(actorRef, 1);
-
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)),
-                        isA(ReadyTransaction.SERIALIZABLE_CLASS));
+        expectFailedBatchedModifications(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -601,9 +656,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
-
         verifyCohortFutures(proxy, TestException.class);
     }
 
@@ -634,15 +686,16 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testReadyWithInvalidReplyMessageType() throws Exception {
+        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        expectBatchedModifications(actorRef, 1);
+        //expectBatchedModifications(actorRef, 1);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)),
-                        isA(ReadyTransaction.SERIALIZABLE_CLASS));
+                        isA(BatchedModifications.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -657,17 +710,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyCohortFutures(proxy, IllegalArgumentException.class);
     }
 
-    @Test
-    public void testUnusedTransaction() throws Exception {
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
-        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
-        assertEquals("canCommit", true, ready.canCommit().get());
-        ready.preCommit().get();
-        ready.commit().get();
-    }
-
     @Test
     public void testGetIdentifier() {
         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
@@ -711,24 +753,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
      */
     @Test
     public void testLocalTxActorRead() throws Exception {
-        ActorSystem actorSystem = getSystem();
-        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
-        doReturn(actorSystem.actorSelection(shardActorRef.path())).
-            when(mockActorContext).actorSelection(shardActorRef.path().toString());
-
-        doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
-            when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
-        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-            .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
-
-        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
-            executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                eqCreateTransaction(memberName, READ_ONLY));
-
-        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+        setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
+        doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
 
@@ -764,40 +790,20 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testLocalTxActorReady() throws Exception {
-        ActorSystem actorSystem = getSystem();
-        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
-        doReturn(actorSystem.actorSelection(shardActorRef.path())).
-            when(mockActorContext).actorSelection(shardActorRef.path().toString());
-
-        doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
-            when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
-        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
-            setTransactionId("txn-1").setTransactionActorPath(actorPath).
-            setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
-
-        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
-            executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                eqCreateTransaction(memberName, WRITE_ONLY));
-
-        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+        doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
         doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
                 any(ActorSelection.class), isA(BatchedModifications.class));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
-
         // testing ready
-        doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), isA(ReadyTransaction.class));
+        doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+            eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
@@ -805,7 +811,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
     }
 
     private static interface TransactionProxyOperation {
@@ -875,20 +881,23 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
             doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
         } else {
-            doReturn(Futures.failed(new Exception("not found")))
+            doReturn(Futures.failed(new PrimaryNotFoundException("test")))
                     .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
         }
 
-        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+        String actorPath = txActorRef.path().toString();
         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
                 setTransactionId("txn-1").setTransactionActorPath(actorPath).
                 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
+        doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
+
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                         eqCreateTransaction(memberName, READ_WRITE));
 
-        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+        doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
@@ -921,6 +930,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteThrottlingWhenShardFound(){
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -938,6 +948,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     @Test
     public void testWriteThrottlingWhenShardNotFound(){
         // Confirm that there is no throttling when the Shard is not found
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -956,6 +967,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testWriteCompletion(){
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -972,7 +984,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeThrottlingWhenShardFound(){
-
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -989,7 +1001,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeThrottlingWhenShardNotFound(){
-
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1006,6 +1018,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testMergeCompletion(){
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1054,6 +1067,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testDeleteCompletion(){
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
@@ -1210,13 +1224,11 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         }, 2, true);
     }
 
-    @Test
-    public void testModificationOperationBatching() throws Throwable {
+    private void testModificationOperationBatching(TransactionType type) throws Exception {
         int shardBatchedModificationCount = 3;
-        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
-                when(mockActorContext).getDatastoreContext();
+        dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
 
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
 
         expectBatchedModifications(actorRef, shardBatchedModificationCount);
 
@@ -1243,7 +1255,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
 
         transactionProxy.write(writePath1, writeNode1);
         transactionProxy.write(writePath2, writeNode2);
@@ -1260,24 +1272,46 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
 
-        verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+        verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
                 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
 
-        verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+        verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
 
-        verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
+        boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
+        verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
                 new DeleteModification(deletePath2));
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+        if(optimizedWriteOnly) {
+            verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                    BatchedModificationsReply.class, BatchedModificationsReply.class);
+        } else {
+            verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                    BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+        }
+    }
+
+    @Test
+    public void testReadWriteModificationOperationBatching() throws Throwable {
+        testModificationOperationBatching(READ_WRITE);
+    }
+
+    @Test
+    public void testWriteOnlyModificationOperationBatching() throws Throwable {
+        testModificationOperationBatching(WRITE_ONLY);
+    }
+
+    @Test
+    public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
+        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+        testModificationOperationBatching(WRITE_ONLY);
     }
 
     @Test
     public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
+
         int shardBatchedModificationCount = 10;
-        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
-                when(mockActorContext).getDatastoreContext();
+        dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
 
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
@@ -1333,13 +1367,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
 
-        verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+        verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
                 new WriteModification(writePath2, writeNode2));
 
-        verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+        verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
                 new MergeModification(mergePath2, mergeNode2));
 
-        verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
+        verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
 
         InOrder inOrder = Mockito.inOrder(mockActorContext);
         inOrder.verify(mockActorContext).executeOperationAsync(
index 08c32c9..2980f83 100644 (file)
@@ -15,10 +15,12 @@ import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
 import akka.dispatch.Futures;
 import com.google.common.base.Optional;
 import java.util.concurrent.TimeUnit;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
@@ -154,4 +156,36 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
     }
+
+    @Test
+    @Ignore
+    // FIXME: disabled until we can get the primary shard version from the ShardManager as we now skip
+    // creating transaction actors for write-only Tx's.
+    public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
+        short version = DataStoreVersions.HELIUM_2_VERSION;
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version);
+
+        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
+
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+        transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+    }
 }
index 15d2eea..c4027ad 100644 (file)
@@ -41,15 +41,19 @@ public class BatchedModificationsTest {
 
         YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
 
-        BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+        BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, "txChain");
         batched.addModification(new WriteModification(writePath, writeData));
         batched.addModification(new MergeModification(mergePath, mergeData));
         batched.addModification(new DeleteModification(deletePath));
+        batched.setReady(true);
 
         BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
                 (Serializable) batched.toSerializable());
 
         assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+        assertEquals("getTransactionID", "tx1", clone.getTransactionID());
+        assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
+        assertEquals("isReady", true, clone.isReady());
 
         assertEquals("getModifications size", 3, clone.getModifications().size());
 
@@ -66,6 +70,20 @@ public class BatchedModificationsTest {
         DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
         assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
         assertEquals("getPath", deletePath, delete.getPath());
+
+        // Test with different params.
+
+        batched = new BatchedModifications("tx2", (short)10, null);
+
+        clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
+
+        assertEquals("getVersion", 10, clone.getVersion());
+        assertEquals("getTransactionID", "tx2", clone.getTransactionID());
+        assertEquals("getTransactionChainID", "", clone.getTransactionChainID());
+        assertEquals("isReady", false, clone.isReady());
+
+        assertEquals("getModifications size", 0, clone.getModifications().size());
+
     }
 
     @Test
@@ -73,5 +91,11 @@ public class BatchedModificationsTest {
         BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
                 (Serializable) new BatchedModificationsReply(100).toSerializable());
         assertEquals("getNumBatched", 100, clone.getNumBatched());
+        assertEquals("getCohortPath", null, clone.getCohortPath());
+
+        clone = (BatchedModificationsReply) SerializationUtils.clone(
+                (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable());
+        assertEquals("getNumBatched", 50, clone.getNumBatched());
+        assertEquals("getCohortPath", "cohort path", clone.getCohortPath());
     }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.