Bug 2597: Batch modification operations in TransactionProxy 95/15095/7
authortpantelis <tpanteli@brocade.com>
Sun, 8 Feb 2015 17:17:16 +0000 (12:17 -0500)
committertpantelis <tpanteli@brocade.com>
Sat, 21 Feb 2015 18:15:38 +0000 (13:15 -0500)
Modified TransactionContextImpl to batch write, merge, delete modification
operations into a single BatchedModifications message and send the batch when a
count threshold is reached.

Instead of using the current WriteData, MergeData, DeleteData message classes
in BatchedModifications, I reused the Modification classes that are used
as the payload for peristence and replication. BatchedModifications
derives from MutableCompositeModification.

The ShardWriteTransaction now simply transfers the Modification
instances in the BatchedModifications instance to its internal
MutableCompositeModification and applies the Modifications to its
transaction.

The Modification classes were refactored a little wrt to versioning.
The Write/Merge/DeleteModifications no longer read/write the version
from the stream. The version is read/written by MutableCompositeModification
so it's redundant to also do so in the data Modification classes.

The WriteData, MergeData, DeleteData and associated reply classes were
deprecated. I did refactor them a little (along with ReadDataReply) as I
was originally going to use them. The VersionedSerializableMessage interface
was removed as I realized it makes more sense to pass the version in the WriteData,
MergeData, DeleteData constructors instead of passing the version via
toSerializable. This made it easier in TransactionContextImpl to transition it
to use BatchedModifications. I created a VersionedExternalizableMessage
base class that reads/write the version.

To handle backwards compatibility with Helium, I derived a
LegacyTransactionContextImpl class from TransactionContextImpl that overrides
writeData, mergeData and deleteData to send WriteData, MergeData,
DeleteData messages. TransactionProxy creates this instance if the
remote tx versions is < Lithium version.

Change-Id: I28df1f89e97667eaca114b991355a6e9d0160a59
Signed-off-by: tpantelis <tpanteli@brocade.com>
43 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java

index cee781fb88e535a04251f66b948e1d1123169a5c..20708335f3d9e6ee4834c7c5b867e701f26f0785 100644 (file)
@@ -42,6 +42,7 @@ public class DatastoreContext {
     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100;
 
     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
@@ -54,8 +55,9 @@ public class DatastoreContext {
     private boolean persistent = DEFAULT_PERSISTENT;
     private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
-    private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+    private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+    private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
 
     private DatastoreContext(){
         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
@@ -152,8 +154,12 @@ public class DatastoreContext {
         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
+    public int getShardBatchedModificationCount() {
+        return shardBatchedModificationCount;
+    }
+
     public static class Builder {
-        private DatastoreContext datastoreContext = new DatastoreContext();
+        private final DatastoreContext datastoreContext = new DatastoreContext();
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
             datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
@@ -246,6 +252,11 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
+            datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
+            return this;
+        }
+
         public DatastoreContext build() {
             return datastoreContext;
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java
new file mode 100644 (file)
index 0000000..65d82b7
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't
+ * support the BatchedModifications message.
+ *
+ * @author Thomas Pantelis
+ */
+class LegacyTransactionContextImpl extends TransactionContextImpl {
+
+    LegacyTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+            ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+            short remoteTransactionVersion, OperationCompleter operationCompleter) {
+        super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
+                remoteTransactionVersion,  operationCompleter);
+    }
+
+    @Override
+    public void deleteData(YangInstanceIdentifier path) {
+        recordedOperationFutures.add(executeOperationAsync(
+                new DeleteData(path, getRemoteTransactionVersion())));
+    }
+
+    @Override
+    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        recordedOperationFutures.add(executeOperationAsync(
+                new MergeData(path, data, getRemoteTransactionVersion())));
+    }
+
+    @Override
+    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        recordedOperationFutures.add(executeOperationAsync(
+                new WriteData(path, data, getRemoteTransactionVersion())));
+    }
+}
index 09fa61b570996919805aa24ff90d0975eb7c62c6..80aa3793c1a2daf140441152c80a03b2408b8dd6 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.Semaphore;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 
 final class OperationCompleter extends OnComplete<Object> {
     private final Semaphore operationLimiter;
@@ -19,7 +20,11 @@ final class OperationCompleter extends OnComplete<Object> {
     }
 
     @Override
-    public void onComplete(Throwable throwable, Object o){
-        this.operationLimiter.release();
+    public void onComplete(Throwable throwable, Object message) {
+        if(message instanceof BatchedModificationsReply) {
+            this.operationLimiter.release(((BatchedModificationsReply)message).getNumBatched());
+        } else {
+            this.operationLimiter.release();
+        }
     }
 }
\ No newline at end of file
index af25df13d2865ba867d6a529d3ad947101b1510a..613b3749e086abc8cdea38fd322872f656235a91 100644 (file)
@@ -129,9 +129,9 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         try {
             final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
             Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-            ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
+            ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
 
-            sender().tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): readDataReply), self());
+            sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
 
         } catch (Exception e) {
             LOG.debug(String.format("Unexpected error reading path %s", path), e);
index a4a2f45fdbdda87cc1166aa0e169214eea0df313..d5dcfde803a16bfcc6ea285473167d9b78e8c5cb 100644 (file)
@@ -13,6 +13,8 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
@@ -24,6 +26,7 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -37,7 +40,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  */
 public class ShardWriteTransaction extends ShardTransaction {
 
-    private final MutableCompositeModification modification = new MutableCompositeModification();
+    private final MutableCompositeModification compositeModification = new MutableCompositeModification();
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
@@ -55,18 +58,12 @@ public class ShardWriteTransaction extends ShardTransaction {
     @Override
     public void handleReceive(Object message) throws Exception {
 
-        if (message instanceof WriteData) {
-            writeData(transaction, (WriteData) message, !SERIALIZED_REPLY);
-
-        } else if (message instanceof MergeData) {
-            mergeData(transaction, (MergeData) message, !SERIALIZED_REPLY);
-
-        } else if (message instanceof DeleteData) {
-            deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
-
+        if (message instanceof BatchedModifications) {
+            batchedModifications((BatchedModifications)message);
         } else if (message instanceof ReadyTransaction) {
             readyTransaction(transaction, !SERIALIZED_REPLY);
-
+        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readyTransaction(transaction, SERIALIZED_REPLY);
         } else if(WriteData.isSerializedType(message)) {
             writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
 
@@ -76,22 +73,32 @@ public class ShardWriteTransaction extends ShardTransaction {
         } else if(DeleteData.isSerializedType(message)) {
             deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
 
-        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(transaction, SERIALIZED_REPLY);
-
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
-            getSender().tell(new GetCompositeModificationReply(modification), getSelf());
+            getSender().tell(new GetCompositeModificationReply(compositeModification), getSelf());
         } else {
             super.handleReceive(message);
         }
     }
 
+    private void batchedModifications(BatchedModifications batched) {
+        try {
+            for(Modification modification: batched.getModifications()) {
+                compositeModification.addModification(modification);
+                modification.apply(transaction);
+            }
+
+            getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+        } catch (Exception e) {
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
+    }
+
     private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
             boolean returnSerialized) {
         LOG.debug("writeData at path : {}", message.getPath());
 
-        modification.addModification(
+        compositeModification.addModification(
                 new WriteModification(message.getPath(), message.getData()));
         try {
             transaction.write(message.getPath(), message.getData());
@@ -107,7 +114,7 @@ public class ShardWriteTransaction extends ShardTransaction {
             boolean returnSerialized) {
         LOG.debug("mergeData at path : {}", message.getPath());
 
-        modification.addModification(
+        compositeModification.addModification(
                 new MergeModification(message.getPath(), message.getData()));
 
         try {
@@ -124,7 +131,7 @@ public class ShardWriteTransaction extends ShardTransaction {
             boolean returnSerialized) {
         LOG.debug("deleteData at path : {}", message.getPath());
 
-        modification.addModification(new DeleteModification(message.getPath()));
+        compositeModification.addModification(new DeleteModification(message.getPath()));
         try {
             transaction.delete(message.getPath());
             DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
@@ -143,7 +150,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
         getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
-                cohort, modification, returnSerialized), getContext());
+                cohort, compositeModification, returnSerialized), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
         getSelf().tell(PoisonPill.getInstance(), getSelf());
index 03d1b3a6d736541dca754e49fa6f35ea67b7ed2b..1e222e4c0a667307ce9b7cf3c24585b7f61d5911 100644 (file)
@@ -15,18 +15,19 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -36,7 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
-final class TransactionContextImpl extends AbstractTransactionContext {
+class TransactionContextImpl extends AbstractTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
     private final ActorContext actorContext;
@@ -44,8 +45,9 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     private final ActorSelection actor;
     private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
-    private final OperationCompleter operationCompleter;
 
+    private final OperationCompleter operationCompleter;
+    private BatchedModifications batchedModifications;
 
     TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
             ActorContext actorContext, SchemaContext schemaContext,
@@ -69,13 +71,12 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         return actor;
     }
 
-    private Future<Object> executeOperationAsync(SerializableMessage msg) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+    protected short getRemoteTransactionVersion() {
+        return remoteTransactionVersion;
     }
 
-    private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
-                msg.toSerializable(remoteTransactionVersion)));
+    protected Future<Object> executeOperationAsync(SerializableMessage msg) {
+        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
     }
 
     @Override
@@ -90,6 +91,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
                 identifier, recordedOperationFutures.size());
 
+        // Send the remaining batched modifications if any.
+
+        sendBatchedModifications();
+
         // Send the ReadyTransaction message to the Tx actor.
 
         final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
@@ -155,25 +160,48 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
+    private void batchModification(Modification modification) {
+        if(batchedModifications == null) {
+            batchedModifications = new BatchedModifications(remoteTransactionVersion);
+        }
+
+        batchedModifications.addModification(modification);
+
+        if(batchedModifications.getModifications().size() >=
+                actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
+            sendBatchedModifications();
+        }
+    }
+
+    private void sendBatchedModifications() {
+        if(batchedModifications != null) {
+            LOG.debug("Tx {} sending {} batched modifications", identifier,
+                    batchedModifications.getModifications().size());
+
+            recordedOperationFutures.add(executeOperationAsync(batchedModifications));
+            batchedModifications = null;
+        }
+    }
+
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         LOG.debug("Tx {} deleteData called path = {}", identifier, path);
 
-        recordedOperationFutures.add(executeOperationAsync(new DeleteData(path)));
+        batchModification(new DeleteModification(path));
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} mergeData called path = {}", identifier, path);
 
-        recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
+        batchModification(new MergeModification(path, data));
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} writeData called path = {}", identifier, path);
 
-        recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
+        batchModification(new WriteModification(path, data));
     }
 
     @Override
@@ -182,6 +210,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         LOG.debug("Tx {} readData called path = {}", identifier, path);
 
+        // Send the remaining batched modifications if any.
+
+        sendBatchedModifications();
+
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail the read.
@@ -263,6 +295,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         LOG.debug("Tx {} dataExists called path = {}", identifier, path);
 
+        // Send the remaining batched modifications if any.
+
+        sendBatchedModifications();
+
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail this
index d63ec8010dc714adc71c951bf7fd0a65937a4ce7..58b37be2a2727babd9b0305e868d85d90c079052 100644 (file)
@@ -304,7 +304,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     private void throttleOperation(int acquirePermits) {
         try {
-            if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+            if(!operationLimiter.tryAcquire(acquirePermits,
+                    actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
             }
         } catch (InterruptedException e) {
@@ -689,7 +690,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
             String transactionPath = reply.getTransactionPath();
 
-            LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
+            LOG.debug("Tx {} Received {}", identifier, reply);
 
             ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
 
@@ -707,8 +708,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
-            return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
+                return new TransactionContextImpl(transactionPath, transactionActor, identifier,
+                    actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            } else {
+                return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier,
+                        actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            }
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
new file mode 100644 (file)
index 0000000..670641f
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+
+/**
+ * Message used to batch write, merge, delete modification operations to the  ShardTransaction actor.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModifications extends MutableCompositeModification implements SerializableMessage {
+    private static final long serialVersionUID = 1L;
+
+    public BatchedModifications() {
+    }
+
+    public BatchedModifications(short version) {
+        super(version);
+    }
+
+    @Override
+    public Object toSerializable() {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java
new file mode 100644 (file)
index 0000000..33c5733
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * The reply for the BatchedModifications message.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModificationsReply extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
+
+    private int numBatched;
+
+    public BatchedModificationsReply() {
+    }
+
+    public BatchedModificationsReply(int numBatched) {
+        this.numBatched = numBatched;
+    }
+
+
+    public int getNumBatched() {
+        return numBatched;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        numBatched = in.readInt();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeInt(numBatched);
+    }
+
+    @Override
+    public Object toSerializable() {
+        return this;
+    }
+}
index ffd0f1ccf3cfcc79598aad6d03f171fb2c70de61..c2bf81fa8e75fd0b5da2a700d37ab142e4815042 100644 (file)
@@ -57,4 +57,11 @@ public class CreateTransactionReply implements SerializableMessage {
                 (short)o.getMessageVersion());
     }
 
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("CreateTransactionReply [transactionPath=").append(transactionPath).append(", transactionId=")
+                .append(transactionId).append(", version=").append(version).append("]");
+        return builder.toString();
+    }
 }
index 04bc63c5a5e73506dc9835ae4abf36915372bcee..5ba787c98322eb155ce5971a91807a5c11d3dc66 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -18,18 +17,22 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class DeleteData implements VersionedSerializableMessage, Externalizable {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class DeleteData extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
     public static final Class<DeleteData> SERIALIZABLE_CLASS = DeleteData.class;
 
     private YangInstanceIdentifier path;
-    private short version;
 
     public DeleteData() {
     }
 
-    public DeleteData(final YangInstanceIdentifier path) {
+    public DeleteData(final YangInstanceIdentifier path, short version) {
+        super(version);
         this.path = path;
     }
 
@@ -37,26 +40,21 @@ public class DeleteData implements VersionedSerializableMessage, Externalizable
         return path;
     }
 
-    public short getVersion() {
-        return version;
-    }
-
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        version = in.readShort(); // Read the version - don't need to do anything with it now
+        super.readExternal(in);
         path = SerializationUtils.deserializePath(in);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(version);
+        super.writeExternal(out);
         SerializationUtils.serializePath(path, out);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            version = toVersion;
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             // To base or R1 Helium version
@@ -71,7 +69,8 @@ public class DeleteData implements VersionedSerializableMessage, Externalizable
         } else {
             // From base or R1 Helium version
             ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
-            return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+            return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
index 0c6ff0e68d69d05e56871d3e197795387310232d..dd21b0e2e64071576b141c8062761bfa9491ef6d 100644 (file)
@@ -10,7 +10,12 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
 public class DeleteDataReply extends EmptyReply {
+    private static final long serialVersionUID = 1L;
 
     private static final Object LEGACY_SERIALIZED_INSTANCE =
             ShardTransactionMessages.DeleteDataReply.newBuilder().build();
index 284c6eff8d3488037fd63ca76b4326228bb4e3c4..38a37f0ccfedbe32f162db088533cee388b912fb 100644 (file)
@@ -14,7 +14,7 @@ import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
  *
  * @author Thomas Pantelis
  */
-public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage {
+public abstract class EmptyReply extends EmptyExternalizable {
 
     private final Object legacySerializedInstance;
 
@@ -23,7 +23,6 @@ public abstract class EmptyReply extends EmptyExternalizable implements Versione
         this.legacySerializedInstance = legacySerializedInstance;
     }
 
-    @Override
     public Object toSerializable(short toVersion) {
         return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance;
     }
index ae0d630cf267aa75b0ef403b8df7e7733d06e082..0f44733503ead5c1eb2a2e84bab07c284978fd3e 100644 (file)
@@ -16,7 +16,11 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class MergeData extends ModifyData implements VersionedSerializableMessage {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class MergeData extends ModifyData {
     private static final long serialVersionUID = 1L;
 
     public static final Class<MergeData> SERIALIZABLE_CLASS = MergeData.class;
@@ -24,14 +28,13 @@ public class MergeData extends ModifyData implements VersionedSerializableMessag
     public MergeData() {
     }
 
-    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        super(path, data);
+    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+        super(path, data, version);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            setVersion(toVersion);
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             // To base or R1 Helium version
@@ -50,7 +53,8 @@ public class MergeData extends ModifyData implements VersionedSerializableMessag
             ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
             Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
                     o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-            return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode());
+            return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
index a4c514bdbf0751b7399ba5928c367089a8210e82..6936ef14c52e6dbac89e9379d5d8230bf51a1d59 100644 (file)
@@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
 public class MergeDataReply extends EmptyReply {
     private static final long serialVersionUID = 1L;
 
index 69c41c2a5663f5021a7a22401aef48ca9b3986d2..bbd090f9291ccde682b33a46e748e6f618352754 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -17,17 +16,21 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Ap
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public abstract class ModifyData implements Externalizable {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public abstract class ModifyData extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
     private YangInstanceIdentifier path;
     private NormalizedNode<?, ?> data;
-    private short version;
 
     protected ModifyData() {
     }
 
-    protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+    protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+        super(version);
         this.path = path;
         this.data = data;
     }
@@ -40,23 +43,15 @@ public abstract class ModifyData implements Externalizable {
         return data;
     }
 
-    public short getVersion() {
-        return version;
-    }
-
-    protected void setVersion(short version) {
-        this.version = version;
-    }
-
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        version = in.readShort();
+        super.readExternal(in);
         SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(version);
+        super.writeExternal(out);
         SerializationUtils.serializePathAndNode(path, data, out);
     }
 
index 8ac6e1b1494a68fd3e7f2f04e0081a4d7f538a10..b0c163d87f346ccaefc300ce38f88573ab033b17 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.protobuf.ByteString;
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -19,18 +18,18 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class ReadDataReply implements VersionedSerializableMessage, Externalizable {
+public class ReadDataReply extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
     public static final Class<ReadDataReply> SERIALIZABLE_CLASS = ReadDataReply.class;
 
     private NormalizedNode<?, ?> normalizedNode;
-    private short version;
 
     public ReadDataReply() {
     }
 
-    public ReadDataReply(NormalizedNode<?, ?> normalizedNode) {
+    public ReadDataReply(NormalizedNode<?, ?> normalizedNode, short version) {
+        super(version);
         this.normalizedNode = normalizedNode;
     }
 
@@ -40,20 +39,19 @@ public class ReadDataReply implements VersionedSerializableMessage, Externalizab
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        version = in.readShort();
+        super.readExternal(in);
         normalizedNode = SerializationUtils.deserializeNormalizedNode(in);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(version);
+        super.writeExternal(out);
         SerializationUtils.serializeNormalizedNode(normalizedNode, out);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            version = toVersion;
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             return toSerializableReadDataReply(normalizedNode);
@@ -78,7 +76,8 @@ public class ReadDataReply implements VersionedSerializableMessage, Externalizab
         } else {
             ShardTransactionMessages.ReadDataReply o =
                     (ShardTransactionMessages.ReadDataReply) serializable;
-            return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()));
+            return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java
new file mode 100644 (file)
index 0000000..2a660fa
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Abstract base class for a versioned Externalizable message.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage {
+    private static final long serialVersionUID = 1L;
+
+    private short version;
+
+    public VersionedExternalizableMessage() {
+    }
+
+    public VersionedExternalizableMessage(short version) {
+        this.version = version;
+    }
+
+    public short getVersion() {
+        return version;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        version = in.readShort();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(version);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java
deleted file mode 100644 (file)
index 5c30b10..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-/**
- * Interface for a Serializable message with versioning.
- *
- * @author Thomas Pantelis
- */
-public interface VersionedSerializableMessage {
-    Object toSerializable(short toVersion);
-}
index 989949c88fb0f0c5400ea903119435a6d4ed0940..a4f648b6b3ccb1a99fe1f3c66241f6801c4290bc 100644 (file)
@@ -16,7 +16,11 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class WriteData extends ModifyData implements VersionedSerializableMessage {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class WriteData extends ModifyData {
     private static final long serialVersionUID = 1L;
 
     public static final Class<WriteData> SERIALIZABLE_CLASS = WriteData.class;
@@ -24,14 +28,13 @@ public class WriteData extends ModifyData implements VersionedSerializableMessag
     public WriteData() {
     }
 
-    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        super(path, data);
+    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+        super(path, data, version);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            setVersion(toVersion);
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             // To base or R1 Helium version
@@ -50,7 +53,8 @@ public class WriteData extends ModifyData implements VersionedSerializableMessag
             ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
             Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
                     o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-            return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode());
+            return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
index 8255828819cd494a93fc61dacd32bc48ba55edba..3455571a518f1bf656369a23f2ea202ee53d6271 100644 (file)
@@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
 public class WriteDataReply extends EmptyReply {
     private static final long serialVersionUID = 1L;
 
index f04d00440405deab7e5a8be141fd624fd3c30f35..77f0858d7b1ef9119fb5b2bff9039de0d30e4536 100644 (file)
@@ -17,8 +17,10 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public abstract class AbstractModification implements Modification {
 
     private YangInstanceIdentifier path;
+    private short version;
 
-    protected AbstractModification() {
+    protected AbstractModification(short version) {
+        this.version = version;
     }
 
     protected AbstractModification(YangInstanceIdentifier path) {
@@ -32,4 +34,8 @@ public abstract class AbstractModification implements Modification {
     public YangInstanceIdentifier getPath() {
         return path;
     }
+
+    public short getVersion() {
+        return version;
+    }
 }
index 833f86fb981f1179ce326c4d3703f17c3449aa73..3a63f5b17361a0015ac43ba293bcd54ee29339cb 100644 (file)
@@ -25,6 +25,11 @@ public class DeleteModification extends AbstractModification {
     private static final long serialVersionUID = 1L;
 
     public DeleteModification() {
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public DeleteModification(short version) {
+        super(version);
     }
 
     public DeleteModification(YangInstanceIdentifier path) {
@@ -43,13 +48,11 @@ public class DeleteModification extends AbstractModification {
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        in.readShort();
         setPath(SerializationUtils.deserializePath(in));
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(DataStoreVersions.CURRENT_VERSION);
         SerializationUtils.serializePath(getPath(), out);
     }
 
@@ -66,8 +69,9 @@ public class DeleteModification extends AbstractModification {
         return new DeleteModification(InstanceIdentifierUtils.fromSerializable(o.getPath()));
     }
 
-    public static DeleteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
-        DeleteModification mod = new DeleteModification();
+    public static DeleteModification fromStream(ObjectInput in, short version)
+            throws ClassNotFoundException, IOException {
+        DeleteModification mod = new DeleteModification(version);
         mod.readExternal(in);
         return mod;
     }
index 571443eedd3a89f2ce9d474cbfbd883919a8d7ff..7ba74f4e7ff63b42e82d81ab6e82f7f22b1ede6f 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.modification;
 
 import java.io.IOException;
 import java.io.ObjectInput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
@@ -24,6 +25,11 @@ public class MergeModification extends WriteModification {
     private static final long serialVersionUID = 1L;
 
     public MergeModification() {
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public MergeModification(short version) {
+        super(version);
     }
 
     public MergeModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
@@ -47,8 +53,9 @@ public class MergeModification extends WriteModification {
         return new MergeModification(decoded.getDecodedPath(), decoded.getDecodedNode());
     }
 
-    public static MergeModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
-        MergeModification mod = new MergeModification();
+    public static MergeModification fromStream(ObjectInput in, short version)
+            throws ClassNotFoundException, IOException {
+        MergeModification mod = new MergeModification(version);
         mod.readExternal(in);
         return mod;
     }
index 5d7947b19fc6ddaeafe133ec546dfc879dd07855..b597742319f08a2c04a8b633baf6d525e97dff14 100644 (file)
@@ -27,10 +27,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 public class MutableCompositeModification implements CompositeModification {
     private static final long serialVersionUID = 1L;
 
-    private final List<Modification> modifications;
+    private final List<Modification> modifications = new ArrayList<>();
+    private short version;
 
     public MutableCompositeModification() {
-        modifications = new ArrayList<>();
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public MutableCompositeModification(short version) {
+        this.version = version;
     }
 
     @Override
@@ -45,6 +50,14 @@ public class MutableCompositeModification implements CompositeModification {
         return COMPOSITE;
     }
 
+    public short getVersion() {
+        return version;
+    }
+
+    public void setVersion(short version) {
+        this.version = version;
+    }
+
     /**
      * Add a new Modification to the list of Modifications represented by this
      * composite
@@ -62,7 +75,7 @@ public class MutableCompositeModification implements CompositeModification {
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        in.readShort();
+        version = in.readShort();
 
         int size = in.readInt();
 
@@ -75,15 +88,15 @@ public class MutableCompositeModification implements CompositeModification {
                 byte type = in.readByte();
                 switch(type) {
                 case Modification.WRITE:
-                    modifications.add(WriteModification.fromStream(in));
+                    modifications.add(WriteModification.fromStream(in, version));
                     break;
 
                 case Modification.MERGE:
-                    modifications.add(MergeModification.fromStream(in));
+                    modifications.add(MergeModification.fromStream(in, version));
                     break;
 
                 case Modification.DELETE:
-                    modifications.add(DeleteModification.fromStream(in));
+                    modifications.add(DeleteModification.fromStream(in, version));
                     break;
                 }
             }
@@ -94,7 +107,7 @@ public class MutableCompositeModification implements CompositeModification {
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(DataStoreVersions.CURRENT_VERSION);
+        out.writeShort(version);
 
         out.writeInt(modifications.size());
 
@@ -121,8 +134,7 @@ public class MutableCompositeModification implements CompositeModification {
         builder.setTimeStamp(System.nanoTime());
 
         for (Modification m : modifications) {
-            builder.addModification(
-                    (PersistentMessages.Modification) m.toSerializable());
+            builder.addModification((PersistentMessages.Modification) m.toSerializable());
         }
 
         return builder.build();
index 9c122c9adeef8a14cf05bfa877d38a5cbe310ae2..2fdca5f3792161400bf5e8cbbb0f4e13222bcad5 100644 (file)
@@ -31,6 +31,11 @@ public class WriteModification extends AbstractModification {
     private NormalizedNode<?, ?> data;
 
     public WriteModification() {
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public WriteModification(short version) {
+        super(version);
     }
 
     public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
@@ -54,14 +59,11 @@ public class WriteModification extends AbstractModification {
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        in.readShort(); // version
-
         SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(DataStoreVersions.CURRENT_VERSION);
         SerializationUtils.serializePathAndNode(getPath(), data, out);
     }
 
@@ -81,8 +83,9 @@ public class WriteModification extends AbstractModification {
         return new WriteModification(decoded.getDecodedPath(), decoded.getDecodedNode());
     }
 
-    public static WriteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
-        WriteModification mod = new WriteModification();
+    public static WriteModification fromStream(ObjectInput in, short version)
+            throws ClassNotFoundException, IOException {
+        WriteModification mod = new WriteModification(version);
         mod.readExternal(in);
         return mod;
     }
index 7e8307465b9818d3c886098219065c9276b77e65..75e45b1d4f856b6036674096409d39d72fb51c8b 100644 (file)
@@ -1,12 +1,10 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.osgi.framework.BundleContext;
-
 import scala.concurrent.duration.Duration;
 
 public class DistributedConfigDataStoreProviderModule extends
@@ -68,6 +66,7 @@ public class DistributedConfigDataStoreProviderModule extends
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
                 .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
+                .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
index 0655468531a16fe7a9ad2032a5f56d6f948044e3..f5d46a9a7e2da12054f942c5c7d84297532db1b7 100644 (file)
@@ -1,12 +1,10 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.osgi.framework.BundleContext;
-
 import scala.concurrent.duration.Duration;
 
 public class DistributedOperationalDataStoreProviderModule extends
@@ -68,6 +66,7 @@ public class DistributedOperationalDataStoreProviderModule extends
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
                 .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
+                .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
index e2ee7373d0cfd3c4a68ec98a221fd1a121f4f13c..46918890b65b65f1b86b37194c7f91cda144ea17 100644 (file)
@@ -156,6 +156,15 @@ module distributed-datastore-provider {
                           an operation (eg transaction create).";
          }
 
+         leaf shard-batched-modification-count {
+            default 100;
+            type non-zero-uint32-type;
+            description "The number of transaction modification operations (put, merge, delete) to
+                        batch before sending to the shard transaction actor. Batching improves
+                        performance as less modifications messages are sent to the actor and thus
+                        lessens the chance that the transaction actor's mailbox queue could get full.";
+         }
+
          leaf enable-metric-capture {
             default false;
             type boolean;
index d3a3a8fc2df812b88ffc500c71e17d224189b541..0099b58dfbe92370dffa5a8e5ad0263b55abf8c9 100644 (file)
@@ -32,6 +32,7 @@ public class DatastoreContextTest {
         assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
         assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
         assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT, build.getShardBatchedModificationCount());
     }
 
-}
\ No newline at end of file
+}
index 66fa876277ca97a164a5b433fa3e11ecb50e69a7..4ec035ee3b52308e6390d9a2e4dece6703438318 100644 (file)
@@ -25,6 +25,7 @@ public class DistributedDataStoreTest extends AbstractActorTest {
         schemaContext = TestModel.createTestContext();
 
         doReturn(schemaContext).when(actorContext).getSchemaContext();
+        doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
     }
 
     @Test
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java
new file mode 100644 (file)
index 0000000..e7afe26
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import java.util.concurrent.Semaphore;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+
+/**
+ * Unit tests for OperationCompleter.
+ *
+ * @author Thomas Pantelis
+ */
+public class OperationCompleterTest {
+
+    @Test
+    public void testOnComplete() throws Exception {
+        int permits = 10;
+        Semaphore operationLimiter = new Semaphore(permits);
+        operationLimiter.acquire(permits);
+        int availablePermits = 0;
+
+        OperationCompleter completer = new OperationCompleter(operationLimiter );
+
+        completer.onComplete(null, new DataExistsReply(true));
+        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+        completer.onComplete(null, new DataExistsReply(true));
+        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+        completer.onComplete(null, new IllegalArgumentException());
+        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+        completer.onComplete(null, new BatchedModificationsReply(4));
+        availablePermits += 4;
+        assertEquals("availablePermits", availablePermits, operationLimiter.availablePermits());
+    }
+}
index 58cec67a2d6cce1f30e3cdf41fdaf7b7d23185a7..1d1b08b5f8d57db68e9690cf1045fec5cff10bcc 100644 (file)
@@ -79,8 +79,8 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
-                            DataStoreVersions.BASE_HELIUM_VERSION), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.BASE_HELIUM_VERSION).
+                        toSerializable(), getRef());
 
             expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
 
@@ -153,9 +153,11 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                    DataStoreVersions.BASE_HELIUM_VERSION).toSerializable(), getRef());
 
-            expectMsgClass(duration, WriteDataReply.class);
+            expectMsgClass(duration, WriteDataReply.INSTANCE.toSerializable(
+                    DataStoreVersions.BASE_HELIUM_VERSION).getClass());
 
             // Ready the Tx
 
index 851fb0114b3a64c7d211a5c6375c14c41e715e69..2973d277f5d8a4d4ce4576ba8dd8bc977295a1c8 100644 (file)
@@ -14,10 +14,14 @@ import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
@@ -46,9 +50,11 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
@@ -250,8 +256,8 @@ public class ShardTransactionTest extends AbstractActorTest {
                     "testOnReceiveWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
-                            DataStoreVersions.HELIUM_2_VERSION), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
+                        toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
 
@@ -259,7 +265,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
             // unserialized write
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
@@ -293,8 +299,8 @@ public class ShardTransactionTest extends AbstractActorTest {
                     "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
-                            DataStoreVersions.HELIUM_2_VERSION), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
+                        toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
 
@@ -302,7 +308,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
             //unserialized merge
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
@@ -335,20 +341,73 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
                     "testDeleteData");
 
-            transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(
-                    DataStoreVersions.HELIUM_2_VERSION), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
+                    toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
 
             assertModification(transaction, DeleteModification.class);
 
             //unserialized
-            transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
 
             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveBatchedModifications() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+            final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
+
+            YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+            NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                    withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+            YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+            NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+            YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+            BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+            batched.addModification(new WriteModification(writePath, writeData));
+            batched.addModification(new MergeModification(mergePath, mergeData));
+            batched.addModification(new DeleteModification(deletePath));
+
+            transaction.tell(batched, getRef());
+
+            BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+            assertEquals("getNumBatched", 3, reply.getNumBatched());
+
+            JavaTestKit verification = new JavaTestKit(getSystem());
+            transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
+
+            CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
+                        GetCompositeModificationReply.class).getModification();
+
+            assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
+
+            WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
+            assertEquals("getPath", writePath, write.getPath());
+            assertEquals("getData", writeData, write.getData());
+
+            MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
+            assertEquals("getPath", mergePath, merge.getPath());
+            assertEquals("getData", mergeData, merge.getData());
+
+            DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
+            assertEquals("getPath", deletePath, delete.getPath());
+
+            InOrder inOrder = Mockito.inOrder(mockWriteTx);
+            inOrder.verify(mockWriteTx).write(writePath, writeData);
+            inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
+            inOrder.verify(mockWriteTx).delete(deletePath);
+        }};
+    }
 
     @Test
     public void testOnReceiveReadyTransaction() throws Exception {
@@ -463,8 +522,8 @@ public class ShardTransactionTest extends AbstractActorTest {
                 DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
-        transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(
-                DataStoreVersions.CURRENT_VERSION), ActorRef.noSender());
+        transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
+                toSerializable(), ActorRef.noSender());
     }
 
     @Test
index 23c3a82a38255e9bfff2708583c4370ccb6ccf9a..88ab0dd292b4894f0eac4c8ae782452d9d696471 100644 (file)
@@ -43,6 +43,7 @@ public class TransactionChainProxyTest extends AbstractActorTest{
         actorContext.setSchemaContext(schemaContext);
 
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+        doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext();
     }
 
     @SuppressWarnings("resource")
index fa2f9187d6059f1585a1475531556d8563db3c5a..6573308c12100914badbedc5d0296b90e096a2b7 100644 (file)
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -39,13 +40,18 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -60,6 +66,11 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -71,6 +82,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -102,7 +114,10 @@ public class TransactionProxyTest {
     @Mock
     private ClusterWrapper mockClusterWrapper;
 
-    String memberName = "mock-member";
+    private final String memberName = "mock-member";
+
+    private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
+            shardBatchedModificationCount(1);
 
     @BeforeClass
     public static void setUpClass() throws IOException {
@@ -126,15 +141,13 @@ public class TransactionProxyTest {
 
         schemaContext = TestModel.createTestContext();
 
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
-
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
-        doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+        doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         ShardStrategyFactory.setConfiguration(configuration);
@@ -187,11 +200,15 @@ public class TransactionProxyTest {
     }
 
     private ReadData eqSerializedReadData() {
+        return eqSerializedReadData(TestModel.TEST_PATH);
+    }
+
+    private ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
             @Override
             public boolean matches(Object argument) {
                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+                       ReadData.fromSerializable(argument).getPath().equals(path);
             }
         };
 
@@ -210,23 +227,13 @@ public class TransactionProxyTest {
         return argThat(matcher);
     }
 
-    private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
-        return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
-            final int transactionVersion) {
+    private WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
             @Override
             public boolean matches(Object argument) {
-                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
-                        WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
-                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
-                           ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
-
+                if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) {
                     WriteData obj = WriteData.fromSerializable(argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                           obj.getData().equals(nodeToWrite);
+                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
                 }
 
                 return false;
@@ -236,39 +243,13 @@ public class TransactionProxyTest {
         return argThat(matcher);
     }
 
-    private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
-        ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(argument instanceof WriteData) {
-                    WriteData obj = (WriteData) argument;
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                        obj.getData().equals(nodeToWrite);
-                }
-                return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
-        return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
-            final int transactionVersion) {
+    private MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
             @Override
             public boolean matches(Object argument) {
-                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
-                        MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
-                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
-                           ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
-
+                if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) {
                     MergeData obj = MergeData.fromSerializable(argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                           obj.getData().equals(nodeToWrite);
+                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
                 }
 
                 return false;
@@ -278,41 +259,12 @@ public class TransactionProxyTest {
         return argThat(matcher);
     }
 
-    private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
-        ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(argument instanceof MergeData) {
-                    MergeData obj = ((MergeData) argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                        obj.getData().equals(nodeToWrite);
-                }
-
-               return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private DeleteData eqSerializedDeleteData() {
-        ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-        private DeleteData eqDeleteData() {
+    private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
             @Override
             public boolean matches(Object argument) {
-                return argument instanceof DeleteData &&
-                    ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
+                return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) &&
+                       DeleteData.fromSerializable(argument).getPath().equals(expPath);
             }
         };
 
@@ -329,7 +281,7 @@ public class TransactionProxyTest {
 
     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
             short transactionVersion) {
-        return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+        return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable());
     }
 
     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
@@ -337,7 +289,7 @@ public class TransactionProxyTest {
     }
 
     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(data));
+        return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
     }
 
     private Future<Object> dataExistsSerializedReply(boolean exists) {
@@ -348,48 +300,41 @@ public class TransactionProxyTest {
         return Futures.successful(new DataExistsReply(exists));
     }
 
-    private Future<Object> writeSerializedDataReply(short version) {
-        return Futures.successful(new WriteDataReply().toSerializable(version));
-    }
-
-    private Future<Object> writeSerializedDataReply() {
-        return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private Future<WriteDataReply> writeDataReply() {
-        return Futures.successful(new WriteDataReply());
-    }
-
-    private Future<Object> mergeSerializedDataReply(short version) {
-        return Futures.successful(new MergeDataReply().toSerializable(version));
-    }
-
-    private Future<Object> mergeSerializedDataReply() {
-        return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+    private Future<BatchedModificationsReply> batchedModificationsReply(int count) {
+        return Futures.successful(new BatchedModificationsReply(count));
     }
 
     private Future<Object> incompleteFuture(){
         return mock(Future.class);
     }
 
-    private Future<MergeDataReply> mergeDataReply() {
-        return Futures.successful(new MergeDataReply());
+    private ActorSelection actorSelection(ActorRef actorRef) {
+        return getSystem().actorSelection(actorRef.path());
+    }
+
+    private void expectBatchedModifications(ActorRef actorRef, int count) {
+        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
-    private Future<Object> deleteSerializedDataReply(short version) {
-        return Futures.successful(new DeleteDataReply().toSerializable(version));
+    private void expectBatchedModifications(int count) {
+        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
     }
 
-    private Future<Object> deleteSerializedDataReply() {
-        return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+    private void expectIncompleteBatchedModifications() {
+        doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
     }
 
-    private Future<DeleteDataReply> deleteDataReply() {
-        return Futures.successful(new DeleteDataReply());
+    private void expectReadyTransaction(ActorRef actorRef) {
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
     }
 
-    private ActorSelection actorSelection(ActorRef actorRef) {
-        return getSystem().actorSelection(actorRef.path());
+    private void expectFailedBatchedModifications(ActorRef actorRef) {
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
     private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
@@ -446,8 +391,7 @@ public class TransactionProxyTest {
     public void testRead() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
@@ -476,8 +420,7 @@ public class TransactionProxyTest {
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
     }
@@ -489,8 +432,7 @@ public class TransactionProxyTest {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
     }
@@ -541,21 +483,19 @@ public class TransactionProxyTest {
 
     @Test(expected = TestException.class)
     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
+                when(mockActorContext).getDatastoreContext();
+
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        expectFailedBatchedModifications(actorRef);
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -575,14 +515,12 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
 
@@ -590,16 +528,19 @@ public class TransactionProxyTest {
                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
-
         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+
+        InOrder inOrder = Mockito.inOrder(mockActorContext);
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
     }
 
     @Test(expected=IllegalStateException.class)
     public void testReadPreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
-
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
         transactionProxy.read(TestModel.TEST_PATH);
     }
 
@@ -625,8 +566,7 @@ public class TransactionProxyTest {
     public void testExists() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
@@ -673,23 +613,21 @@ public class TransactionProxyTest {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
     }
 
     @Test(expected = TestException.class)
     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
+                when(mockActorContext).getDatastoreContext();
+
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        expectFailedBatchedModifications(actorRef);
 
         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
@@ -715,28 +653,30 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", true, exists);
+
+        InOrder inOrder = Mockito.inOrder(mockActorContext);
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedDataExists());
     }
 
     @Test(expected=IllegalStateException.class)
     public void testExistsPreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
-
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
         transactionProxy.exists(TestModel.TEST_PATH);
     }
 
@@ -757,7 +697,7 @@ public class TransactionProxyTest {
                     // Expected
                 }
             } else {
-                assertEquals("Recording operation Future result type", expResultType,
+                assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
             }
         }
@@ -769,19 +709,20 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     @Test
@@ -796,10 +737,10 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
@@ -833,33 +774,28 @@ public class TransactionProxyTest {
             throw caughtEx.get();
         }
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     @Test(expected=IllegalStateException.class)
     public void testWritePreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
-
-        transactionProxy.write(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
     }
 
     @Test(expected=IllegalStateException.class)
     public void testWriteAfterReadyPreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.ready();
 
-        transactionProxy.write(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
     }
 
     @Test
@@ -868,37 +804,40 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     @Test
     public void testDelete() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
-        doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                DeleteDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
@@ -935,14 +874,10 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -955,9 +890,12 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.class);
+                BatchedModificationsReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
+                isA(BatchedModifications.class));
     }
 
     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
@@ -969,14 +907,16 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+        doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
 
-        doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
+        doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+        doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
+
+        expectReadyTransaction(actorRef);
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
@@ -993,6 +933,8 @@ public class TransactionProxyTest {
 
         transactionProxy.merge(TestModel.TEST_PATH, testNode);
 
+        transactionProxy.delete(TestModel.TEST_PATH);
+
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
@@ -1000,7 +942,8 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
+                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class,
+                ShardTransactionMessages.DeleteDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
@@ -1029,21 +972,13 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        expectFailedBatchedModifications(actorRef);
 
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+        expectReadyTransaction(actorRef);
 
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
-
-        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1055,8 +990,7 @@ public class TransactionProxyTest {
 
         verifyCohortFutures(proxy, TestException.class);
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.class, TestException.class);
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
     }
 
     @Test
@@ -1065,15 +999,13 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)),
                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1084,7 +1016,7 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.class);
+                BatchedModificationsReply.class);
 
         verifyCohortFutures(proxy, TestException.class);
     }
@@ -1095,8 +1027,7 @@ public class TransactionProxyTest {
         doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
                 mockActorContext).findPrimaryShardAsync(anyString());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -1121,15 +1052,13 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)),
                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1160,8 +1089,7 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -1197,9 +1125,7 @@ public class TransactionProxyTest {
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-            .setTransactionId("txn-1")
-            .setTransactionActorPath(actorPath)
-            .build();
+            .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
@@ -1240,7 +1166,7 @@ public class TransactionProxyTest {
     }
 
     @Test
-    public void testLocalTxActorWrite() throws Exception {
+    public void testLocalTxActorReady() throws Exception {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
@@ -1251,48 +1177,26 @@ public class TransactionProxyTest {
             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-            .setTransactionId("txn-1")
-            .setTransactionActorPath(actorPath)
-            .build();
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+            setTransactionId("txn-1").setTransactionActorPath(actorPath).
+            setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
-        executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+            executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                 eqCreateTransaction(memberName, WRITE_ONLY));
 
         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
 
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqWriteData(nodeToWrite));
+        doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        verify(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqWriteData(nodeToWrite));
-
-        //testing local merge
-        doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqMergeData(nodeToWrite));
-
-        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
-        verify(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqMergeData(nodeToWrite));
-
-
-        //testing local delete
-        doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqDeleteData());
 
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-            WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
+                BatchedModificationsReply.class);
 
         // testing ready
         doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
@@ -1333,10 +1237,9 @@ public class TransactionProxyTest {
         }
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-                .setTransactionId("txn-1")
-                .setTransactionActorPath(actorPath)
-                .build();
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+                setTransactionId("txn-1").setTransactionActorPath(actorPath).
+                setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
@@ -1352,9 +1255,9 @@ public class TransactionProxyTest {
 
         long end = System.nanoTime();
 
-        Assert.assertTrue(String.format("took less time than expected %s was %s",
-                TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
-                (end-start)), (end - start) > TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+                expected, (end-start)), (end - start) > expected);
 
     }
 
@@ -1380,10 +1283,9 @@ public class TransactionProxyTest {
         }
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-                .setTransactionId("txn-1")
-                .setTransactionActorPath(actorPath)
-                .build();
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+                setTransactionId("txn-1").setTransactionActorPath(actorPath).
+                setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
@@ -1399,9 +1301,9 @@ public class TransactionProxyTest {
 
         long end = System.nanoTime();
 
-        Assert.assertTrue(String.format("took more time than expected %s was %s",
-                TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
-                (end-start)), (end - start) <= TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+                expected, (end-start)), (end - start) <= expected);
     }
 
     public void testWriteThrottling(boolean shardFound){
@@ -1411,8 +1313,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectBatchedModifications(2);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1428,15 +1329,13 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectIncompleteBatchedModifications();
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
             }
         });
-
     }
 
     @Test
@@ -1447,8 +1346,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectBatchedModifications(2);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1466,15 +1364,13 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+                expectBatchedModifications(2);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
             }
         });
-
     }
 
     @Test
@@ -1485,8 +1381,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+                expectIncompleteBatchedModifications();
 
                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
 
@@ -1503,8 +1398,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+                expectBatchedModifications(2);
 
                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
 
@@ -1520,8 +1414,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+                expectBatchedModifications(2);
 
                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
 
@@ -1537,8 +1430,7 @@ public class TransactionProxyTest {
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqDeleteData());
+                expectIncompleteBatchedModifications();
 
                 transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -1554,8 +1446,7 @@ public class TransactionProxyTest {
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqDeleteData());
+                expectBatchedModifications(2);
 
                 transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -1569,8 +1460,7 @@ public class TransactionProxyTest {
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
-                doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqDeleteData());
+                expectBatchedModifications(2);
 
                 transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -1688,8 +1578,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectBatchedModifications(1);
 
                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
                         any(ActorSelection.class), any(ReadyTransaction.class));
@@ -1710,11 +1599,7 @@ public class TransactionProxyTest {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
 
-                doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
-
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(carsNode));
+                expectBatchedModifications(2);
 
                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
                         any(ActorSelection.class), any(ReadyTransaction.class));
@@ -1727,4 +1612,203 @@ public class TransactionProxyTest {
             }
         }, 2, true);
     }
+
+    @Test
+    public void testModificationOperationBatching() throws Throwable {
+        int shardBatchedModificationCount = 3;
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
+                when(mockActorContext).getDatastoreContext();
+
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        expectBatchedModifications(actorRef, shardBatchedModificationCount);
+
+        expectReadyTransaction(actorRef);
+
+        YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
+        NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+        YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
+        NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+        YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
+        YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.write(writePath1, writeNode1);
+        transactionProxy.write(writePath2, writeNode2);
+        transactionProxy.delete(deletePath1);
+        transactionProxy.merge(mergePath1, mergeNode1);
+        transactionProxy.merge(mergePath2, mergeNode2);
+        transactionProxy.write(writePath3, writeNode3);
+        transactionProxy.merge(mergePath3, mergeNode3);
+        transactionProxy.delete(deletePath2);
+
+        // This sends the last batch.
+        transactionProxy.ready();
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+                new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
+
+        verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+                new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
+
+        verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
+                new DeleteModification(deletePath2));
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+    }
+
+    @Test
+    public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
+        int shardBatchedModificationCount = 10;
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
+                when(mockActorContext).getDatastoreContext();
+
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        expectBatchedModifications(actorRef, shardBatchedModificationCount);
+
+        YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
+        NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+        YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
+
+        doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+
+        doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+
+        doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedDataExists());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.write(writePath1, writeNode1);
+        transactionProxy.write(writePath2, writeNode2);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
+                get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
+
+        transactionProxy.merge(mergePath1, mergeNode1);
+        transactionProxy.merge(mergePath2, mergeNode2);
+
+        readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
+
+        transactionProxy.delete(deletePath);
+
+        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+        assertEquals("Exists response", true, exists);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+                new WriteModification(writePath2, writeNode2));
+
+        verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+                new MergeModification(mergePath2, mergeNode2));
+
+        verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
+
+        InOrder inOrder = Mockito.inOrder(mockActorContext);
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedDataExists());
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+    }
+
+    private List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
+        ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
+                ArgumentCaptor.forClass(BatchedModifications.class);
+        verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
+                eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
+
+        List<BatchedModifications> batchedModifications = filterCaptured(
+                batchedModificationsCaptor, BatchedModifications.class);
+        return batchedModifications;
+    }
+
+    private <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
+        List<T> captured = new ArrayList<>();
+        for(T c: captor.getAllValues()) {
+            if(type.isInstance(c)) {
+                captured.add(c);
+            }
+        }
+
+        return captured;
+    }
+
+    private void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), expected);
+    }
+
+    private void verifyBatchedModifications(Object message, Modification... expected) {
+        assertEquals("Message type", BatchedModifications.class, message.getClass());
+        BatchedModifications batchedModifications = (BatchedModifications)message;
+        assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+        for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
+            Modification actual = batchedModifications.getModifications().get(i);
+            assertEquals("Modification type", expected[i].getClass(), actual.getClass());
+            assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
+                    ((AbstractModification)actual).getPath());
+            if(actual instanceof WriteModification) {
+                assertEquals("getData", ((WriteModification)expected[i]).getData(),
+                        ((WriteModification)actual).getData());
+            }
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java
new file mode 100644 (file)
index 0000000..15d2eea
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for BatchedModifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModificationsTest {
+
+    @Test
+    public void testSerialization() {
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+        YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+        BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(writePath, writeData));
+        batched.addModification(new MergeModification(mergePath, mergeData));
+        batched.addModification(new DeleteModification(deletePath));
+
+        BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
+                (Serializable) batched.toSerializable());
+
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+
+        assertEquals("getModifications size", 3, clone.getModifications().size());
+
+        WriteModification write = (WriteModification)clone.getModifications().get(0);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion());
+        assertEquals("getPath", writePath, write.getPath());
+        assertEquals("getData", writeData, write.getData());
+
+        MergeModification merge = (MergeModification)clone.getModifications().get(1);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion());
+        assertEquals("getPath", mergePath, merge.getPath());
+        assertEquals("getData", mergeData, merge.getData());
+
+        DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
+        assertEquals("getPath", deletePath, delete.getPath());
+    }
+
+    @Test
+    public void testBatchedModificationsReplySerialization() {
+        BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
+                (Serializable) new BatchedModificationsReply(100).toSerializable());
+        assertEquals("getNumBatched", 100, clone.getNumBatched());
+    }
+}
index e950b78ab7d609a6b0aac6e34431cbb5a3f36584..97bade152e0484dd832aacf4c962cc58b6c77d11 100644 (file)
@@ -22,21 +22,22 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  *
  * @author Thomas Pantelis
  */
+@Deprecated
 public class DeleteDataTest {
 
     @Test
     public void testSerialization() {
         YangInstanceIdentifier path = TestModel.TEST_PATH;
 
-        DeleteData expected = new DeleteData(path);
+        DeleteData expected = new DeleteData(path, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", DeleteData.class, serialized.getClass());
         assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)serialized).getVersion());
 
         Object clone = SerializationUtils.clone((Serializable) serialized);
-        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)clone).getVersion());
         DeleteData actual = DeleteData.fromSerializable(clone);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getPath", expected.getPath(), actual.getPath());
     }
 
@@ -58,9 +59,9 @@ public class DeleteDataTest {
     public void testSerializationWithHeliumR1Version() throws Exception {
         YangInstanceIdentifier path = TestModel.TEST_PATH;
 
-        DeleteData expected = new DeleteData(path);
+        DeleteData expected = new DeleteData(path, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.DeleteData.class, serialized.getClass());
 
         DeleteData actual = DeleteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
index 5b40afdff8288ff714cc4f9b4a398e2f2724369b..011d22798e628ee4800f980a4718667781ffdb16 100644 (file)
@@ -14,6 +14,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
+@Deprecated
 public class MergeDataTest {
 
     @Test
@@ -23,15 +24,15 @@ public class MergeDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        MergeData expected = new MergeData(path, data);
+        MergeData expected = new MergeData(path, data, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", MergeData.class, serialized.getClass());
         assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion());
 
         Object clone = SerializationUtils.clone((Serializable) serialized);
-        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion());
         MergeData actual = MergeData.fromSerializable(clone);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getPath", expected.getPath(), actual.getPath());
         assertEquals("getData", expected.getData(), actual.getData());
     }
@@ -58,9 +59,9 @@ public class MergeDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        MergeData expected = new MergeData(path, data);
+        MergeData expected = new MergeData(path, data, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass());
 
         MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
index 8ce73296c14cd0af90491ae4f87e99fa85637c5e..7ad45a61702607cdaf03a61b23573575d54830cd 100644 (file)
@@ -32,13 +32,14 @@ public class ReadDataReplyTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        ReadDataReply expected = new ReadDataReply(data);
+        ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ReadDataReply.class, serialized.getClass());
 
         ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
                 (Serializable) serialized));
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
     }
 
@@ -60,9 +61,9 @@ public class ReadDataReplyTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        ReadDataReply expected = new ReadDataReply(data);
+        ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass());
 
         ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
index 90a76f229e1ddd045c084e56b1b854a518b43a92..8148c9ca9528fc709ad1ecaf3febbda265bc033d 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableCo
  *
  * @author Thomas Pantelis
  */
+@Deprecated
 public class WriteDataTest {
 
     @Test
@@ -35,15 +36,15 @@ public class WriteDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteData expected = new WriteData(path, data);
+        WriteData expected = new WriteData(path, data, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", WriteData.class, serialized.getClass());
         assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion());
 
         Object clone = SerializationUtils.clone((Serializable) serialized);
-        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion());
         WriteData actual = WriteData.fromSerializable(clone);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getPath", expected.getPath(), actual.getPath());
         assertEquals("getData", expected.getData(), actual.getData());
     }
@@ -69,9 +70,9 @@ public class WriteDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteData expected = new WriteData(path, data);
+        WriteData expected = new WriteData(path, data, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass());
 
         WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
index b9d44b2586f75a34b163322f9888b94419dd7cee..e2acaa8d10789d2d1e1f2b5a6f127b5a1c7a3d9a 100644 (file)
@@ -7,6 +7,7 @@ import com.google.common.base.Stopwatch;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -53,17 +54,22 @@ public class MutableCompositeModificationTest extends AbstractModificationTest {
 
         MutableCompositeModification clone = (MutableCompositeModification) SerializationUtils.clone(compositeModification);
 
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+
         assertEquals("getModifications size", 3, clone.getModifications().size());
 
         WriteModification write = (WriteModification)clone.getModifications().get(0);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion());
         assertEquals("getPath", writePath, write.getPath());
         assertEquals("getData", writeData, write.getData());
 
         MergeModification merge = (MergeModification)clone.getModifications().get(1);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion());
         assertEquals("getPath", mergePath, merge.getPath());
         assertEquals("getData", mergeData, merge.getData());
 
         DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
         assertEquals("getPath", deletePath, delete.getPath());
     }
 
index 67fa0960cbcb96cc2f617148aab2c7a7a7bab71d..9761ed8615a763df20ec6763defc69b55395a120 100644 (file)
@@ -31,7 +31,10 @@ public class TestModel {
   private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
 
   public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
-  public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build();
+  public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+          node(OUTER_LIST_QNAME).build();
+  public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+          node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
   public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
   public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");