Merge "Bug 2597: Batch modification operations in TransactionProxy"
authorMoiz Raja <moraja@cisco.com>
Wed, 25 Feb 2015 00:52:17 +0000 (00:52 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 25 Feb 2015 00:52:17 +0000 (00:52 +0000)
43 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java

index cee781fb88e535a04251f66b948e1d1123169a5c..20708335f3d9e6ee4834c7c5b867e701f26f0785 100644 (file)
@@ -42,6 +42,7 @@ public class DatastoreContext {
     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100;
 
     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
@@ -54,8 +55,9 @@ public class DatastoreContext {
     private boolean persistent = DEFAULT_PERSISTENT;
     private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
-    private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+    private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+    private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
 
     private DatastoreContext(){
         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
@@ -152,8 +154,12 @@ public class DatastoreContext {
         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
+    public int getShardBatchedModificationCount() {
+        return shardBatchedModificationCount;
+    }
+
     public static class Builder {
-        private DatastoreContext datastoreContext = new DatastoreContext();
+        private final DatastoreContext datastoreContext = new DatastoreContext();
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
             datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
@@ -246,6 +252,11 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
+            datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
+            return this;
+        }
+
         public DatastoreContext build() {
             return datastoreContext;
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java
new file mode 100644 (file)
index 0000000..65d82b7
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't
+ * support the BatchedModifications message.
+ *
+ * @author Thomas Pantelis
+ */
+class LegacyTransactionContextImpl extends TransactionContextImpl {
+
+    LegacyTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+            ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+            short remoteTransactionVersion, OperationCompleter operationCompleter) {
+        super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
+                remoteTransactionVersion,  operationCompleter);
+    }
+
+    @Override
+    public void deleteData(YangInstanceIdentifier path) {
+        recordedOperationFutures.add(executeOperationAsync(
+                new DeleteData(path, getRemoteTransactionVersion())));
+    }
+
+    @Override
+    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        recordedOperationFutures.add(executeOperationAsync(
+                new MergeData(path, data, getRemoteTransactionVersion())));
+    }
+
+    @Override
+    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        recordedOperationFutures.add(executeOperationAsync(
+                new WriteData(path, data, getRemoteTransactionVersion())));
+    }
+}
index 09fa61b570996919805aa24ff90d0975eb7c62c6..80aa3793c1a2daf140441152c80a03b2408b8dd6 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.Semaphore;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 
 final class OperationCompleter extends OnComplete<Object> {
     private final Semaphore operationLimiter;
@@ -19,7 +20,11 @@ final class OperationCompleter extends OnComplete<Object> {
     }
 
     @Override
-    public void onComplete(Throwable throwable, Object o){
-        this.operationLimiter.release();
+    public void onComplete(Throwable throwable, Object message) {
+        if(message instanceof BatchedModificationsReply) {
+            this.operationLimiter.release(((BatchedModificationsReply)message).getNumBatched());
+        } else {
+            this.operationLimiter.release();
+        }
     }
 }
\ No newline at end of file
index af25df13d2865ba867d6a529d3ad947101b1510a..613b3749e086abc8cdea38fd322872f656235a91 100644 (file)
@@ -129,9 +129,9 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         try {
             final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
             Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-            ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
+            ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
 
-            sender().tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): readDataReply), self());
+            sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
 
         } catch (Exception e) {
             LOG.debug(String.format("Unexpected error reading path %s", path), e);
index a4a2f45fdbdda87cc1166aa0e169214eea0df313..d5dcfde803a16bfcc6ea285473167d9b78e8c5cb 100644 (file)
@@ -13,6 +13,8 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
@@ -24,6 +26,7 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -37,7 +40,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  */
 public class ShardWriteTransaction extends ShardTransaction {
 
-    private final MutableCompositeModification modification = new MutableCompositeModification();
+    private final MutableCompositeModification compositeModification = new MutableCompositeModification();
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
@@ -55,18 +58,12 @@ public class ShardWriteTransaction extends ShardTransaction {
     @Override
     public void handleReceive(Object message) throws Exception {
 
-        if (message instanceof WriteData) {
-            writeData(transaction, (WriteData) message, !SERIALIZED_REPLY);
-
-        } else if (message instanceof MergeData) {
-            mergeData(transaction, (MergeData) message, !SERIALIZED_REPLY);
-
-        } else if (message instanceof DeleteData) {
-            deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
-
+        if (message instanceof BatchedModifications) {
+            batchedModifications((BatchedModifications)message);
         } else if (message instanceof ReadyTransaction) {
             readyTransaction(transaction, !SERIALIZED_REPLY);
-
+        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readyTransaction(transaction, SERIALIZED_REPLY);
         } else if(WriteData.isSerializedType(message)) {
             writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
 
@@ -76,22 +73,32 @@ public class ShardWriteTransaction extends ShardTransaction {
         } else if(DeleteData.isSerializedType(message)) {
             deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
 
-        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(transaction, SERIALIZED_REPLY);
-
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
-            getSender().tell(new GetCompositeModificationReply(modification), getSelf());
+            getSender().tell(new GetCompositeModificationReply(compositeModification), getSelf());
         } else {
             super.handleReceive(message);
         }
     }
 
+    private void batchedModifications(BatchedModifications batched) {
+        try {
+            for(Modification modification: batched.getModifications()) {
+                compositeModification.addModification(modification);
+                modification.apply(transaction);
+            }
+
+            getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+        } catch (Exception e) {
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
+    }
+
     private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
             boolean returnSerialized) {
         LOG.debug("writeData at path : {}", message.getPath());
 
-        modification.addModification(
+        compositeModification.addModification(
                 new WriteModification(message.getPath(), message.getData()));
         try {
             transaction.write(message.getPath(), message.getData());
@@ -107,7 +114,7 @@ public class ShardWriteTransaction extends ShardTransaction {
             boolean returnSerialized) {
         LOG.debug("mergeData at path : {}", message.getPath());
 
-        modification.addModification(
+        compositeModification.addModification(
                 new MergeModification(message.getPath(), message.getData()));
 
         try {
@@ -124,7 +131,7 @@ public class ShardWriteTransaction extends ShardTransaction {
             boolean returnSerialized) {
         LOG.debug("deleteData at path : {}", message.getPath());
 
-        modification.addModification(new DeleteModification(message.getPath()));
+        compositeModification.addModification(new DeleteModification(message.getPath()));
         try {
             transaction.delete(message.getPath());
             DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
@@ -143,7 +150,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
         getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
-                cohort, modification, returnSerialized), getContext());
+                cohort, compositeModification, returnSerialized), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
         getSelf().tell(PoisonPill.getInstance(), getSelf());
index 03d1b3a6d736541dca754e49fa6f35ea67b7ed2b..1e222e4c0a667307ce9b7cf3c24585b7f61d5911 100644 (file)
@@ -15,18 +15,19 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -36,7 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
-final class TransactionContextImpl extends AbstractTransactionContext {
+class TransactionContextImpl extends AbstractTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
     private final ActorContext actorContext;
@@ -44,8 +45,9 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     private final ActorSelection actor;
     private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
-    private final OperationCompleter operationCompleter;
 
+    private final OperationCompleter operationCompleter;
+    private BatchedModifications batchedModifications;
 
     TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
             ActorContext actorContext, SchemaContext schemaContext,
@@ -69,13 +71,12 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         return actor;
     }
 
-    private Future<Object> executeOperationAsync(SerializableMessage msg) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+    protected short getRemoteTransactionVersion() {
+        return remoteTransactionVersion;
     }
 
-    private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
-                msg.toSerializable(remoteTransactionVersion)));
+    protected Future<Object> executeOperationAsync(SerializableMessage msg) {
+        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
     }
 
     @Override
@@ -90,6 +91,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
                 identifier, recordedOperationFutures.size());
 
+        // Send the remaining batched modifications if any.
+
+        sendBatchedModifications();
+
         // Send the ReadyTransaction message to the Tx actor.
 
         final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
@@ -155,25 +160,48 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
+    private void batchModification(Modification modification) {
+        if(batchedModifications == null) {
+            batchedModifications = new BatchedModifications(remoteTransactionVersion);
+        }
+
+        batchedModifications.addModification(modification);
+
+        if(batchedModifications.getModifications().size() >=
+                actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
+            sendBatchedModifications();
+        }
+    }
+
+    private void sendBatchedModifications() {
+        if(batchedModifications != null) {
+            LOG.debug("Tx {} sending {} batched modifications", identifier,
+                    batchedModifications.getModifications().size());
+
+            recordedOperationFutures.add(executeOperationAsync(batchedModifications));
+            batchedModifications = null;
+        }
+    }
+
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         LOG.debug("Tx {} deleteData called path = {}", identifier, path);
 
-        recordedOperationFutures.add(executeOperationAsync(new DeleteData(path)));
+        batchModification(new DeleteModification(path));
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} mergeData called path = {}", identifier, path);
 
-        recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
+        batchModification(new MergeModification(path, data));
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} writeData called path = {}", identifier, path);
 
-        recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
+        batchModification(new WriteModification(path, data));
     }
 
     @Override
@@ -182,6 +210,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         LOG.debug("Tx {} readData called path = {}", identifier, path);
 
+        // Send the remaining batched modifications if any.
+
+        sendBatchedModifications();
+
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail the read.
@@ -263,6 +295,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         LOG.debug("Tx {} dataExists called path = {}", identifier, path);
 
+        // Send the remaining batched modifications if any.
+
+        sendBatchedModifications();
+
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail this
index d63ec8010dc714adc71c951bf7fd0a65937a4ce7..58b37be2a2727babd9b0305e868d85d90c079052 100644 (file)
@@ -304,7 +304,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     private void throttleOperation(int acquirePermits) {
         try {
-            if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+            if(!operationLimiter.tryAcquire(acquirePermits,
+                    actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
             }
         } catch (InterruptedException e) {
@@ -689,7 +690,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
             String transactionPath = reply.getTransactionPath();
 
-            LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
+            LOG.debug("Tx {} Received {}", identifier, reply);
 
             ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
 
@@ -707,8 +708,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
-            return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
+                return new TransactionContextImpl(transactionPath, transactionActor, identifier,
+                    actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            } else {
+                return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier,
+                        actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            }
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
new file mode 100644 (file)
index 0000000..670641f
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+
+/**
+ * Message used to batch write, merge, delete modification operations to the  ShardTransaction actor.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModifications extends MutableCompositeModification implements SerializableMessage {
+    private static final long serialVersionUID = 1L;
+
+    public BatchedModifications() {
+    }
+
+    public BatchedModifications(short version) {
+        super(version);
+    }
+
+    @Override
+    public Object toSerializable() {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java
new file mode 100644 (file)
index 0000000..33c5733
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * The reply for the BatchedModifications message.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModificationsReply extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
+
+    private int numBatched;
+
+    public BatchedModificationsReply() {
+    }
+
+    public BatchedModificationsReply(int numBatched) {
+        this.numBatched = numBatched;
+    }
+
+
+    public int getNumBatched() {
+        return numBatched;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        numBatched = in.readInt();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeInt(numBatched);
+    }
+
+    @Override
+    public Object toSerializable() {
+        return this;
+    }
+}
index ffd0f1ccf3cfcc79598aad6d03f171fb2c70de61..c2bf81fa8e75fd0b5da2a700d37ab142e4815042 100644 (file)
@@ -57,4 +57,11 @@ public class CreateTransactionReply implements SerializableMessage {
                 (short)o.getMessageVersion());
     }
 
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("CreateTransactionReply [transactionPath=").append(transactionPath).append(", transactionId=")
+                .append(transactionId).append(", version=").append(version).append("]");
+        return builder.toString();
+    }
 }
index 04bc63c5a5e73506dc9835ae4abf36915372bcee..5ba787c98322eb155ce5971a91807a5c11d3dc66 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -18,18 +17,22 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class DeleteData implements VersionedSerializableMessage, Externalizable {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class DeleteData extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
     public static final Class<DeleteData> SERIALIZABLE_CLASS = DeleteData.class;
 
     private YangInstanceIdentifier path;
-    private short version;
 
     public DeleteData() {
     }
 
-    public DeleteData(final YangInstanceIdentifier path) {
+    public DeleteData(final YangInstanceIdentifier path, short version) {
+        super(version);
         this.path = path;
     }
 
@@ -37,26 +40,21 @@ public class DeleteData implements VersionedSerializableMessage, Externalizable
         return path;
     }
 
-    public short getVersion() {
-        return version;
-    }
-
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        version = in.readShort(); // Read the version - don't need to do anything with it now
+        super.readExternal(in);
         path = SerializationUtils.deserializePath(in);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(version);
+        super.writeExternal(out);
         SerializationUtils.serializePath(path, out);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            version = toVersion;
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             // To base or R1 Helium version
@@ -71,7 +69,8 @@ public class DeleteData implements VersionedSerializableMessage, Externalizable
         } else {
             // From base or R1 Helium version
             ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
-            return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+            return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
index 0c6ff0e68d69d05e56871d3e197795387310232d..dd21b0e2e64071576b141c8062761bfa9491ef6d 100644 (file)
@@ -10,7 +10,12 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
 public class DeleteDataReply extends EmptyReply {
+    private static final long serialVersionUID = 1L;
 
     private static final Object LEGACY_SERIALIZED_INSTANCE =
             ShardTransactionMessages.DeleteDataReply.newBuilder().build();
index 284c6eff8d3488037fd63ca76b4326228bb4e3c4..38a37f0ccfedbe32f162db088533cee388b912fb 100644 (file)
@@ -14,7 +14,7 @@ import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
  *
  * @author Thomas Pantelis
  */
-public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage {
+public abstract class EmptyReply extends EmptyExternalizable {
 
     private final Object legacySerializedInstance;
 
@@ -23,7 +23,6 @@ public abstract class EmptyReply extends EmptyExternalizable implements Versione
         this.legacySerializedInstance = legacySerializedInstance;
     }
 
-    @Override
     public Object toSerializable(short toVersion) {
         return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance;
     }
index ae0d630cf267aa75b0ef403b8df7e7733d06e082..0f44733503ead5c1eb2a2e84bab07c284978fd3e 100644 (file)
@@ -16,7 +16,11 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class MergeData extends ModifyData implements VersionedSerializableMessage {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class MergeData extends ModifyData {
     private static final long serialVersionUID = 1L;
 
     public static final Class<MergeData> SERIALIZABLE_CLASS = MergeData.class;
@@ -24,14 +28,13 @@ public class MergeData extends ModifyData implements VersionedSerializableMessag
     public MergeData() {
     }
 
-    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        super(path, data);
+    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+        super(path, data, version);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            setVersion(toVersion);
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             // To base or R1 Helium version
@@ -50,7 +53,8 @@ public class MergeData extends ModifyData implements VersionedSerializableMessag
             ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
             Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
                     o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-            return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode());
+            return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
index a4c514bdbf0751b7399ba5928c367089a8210e82..6936ef14c52e6dbac89e9379d5d8230bf51a1d59 100644 (file)
@@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
 public class MergeDataReply extends EmptyReply {
     private static final long serialVersionUID = 1L;
 
index 69c41c2a5663f5021a7a22401aef48ca9b3986d2..bbd090f9291ccde682b33a46e748e6f618352754 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -17,17 +16,21 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Ap
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public abstract class ModifyData implements Externalizable {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public abstract class ModifyData extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
     private YangInstanceIdentifier path;
     private NormalizedNode<?, ?> data;
-    private short version;
 
     protected ModifyData() {
     }
 
-    protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+    protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+        super(version);
         this.path = path;
         this.data = data;
     }
@@ -40,23 +43,15 @@ public abstract class ModifyData implements Externalizable {
         return data;
     }
 
-    public short getVersion() {
-        return version;
-    }
-
-    protected void setVersion(short version) {
-        this.version = version;
-    }
-
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        version = in.readShort();
+        super.readExternal(in);
         SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(version);
+        super.writeExternal(out);
         SerializationUtils.serializePathAndNode(path, data, out);
     }
 
index 8ac6e1b1494a68fd3e7f2f04e0081a4d7f538a10..b0c163d87f346ccaefc300ce38f88573ab033b17 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.protobuf.ByteString;
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -19,18 +18,18 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class ReadDataReply implements VersionedSerializableMessage, Externalizable {
+public class ReadDataReply extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
     public static final Class<ReadDataReply> SERIALIZABLE_CLASS = ReadDataReply.class;
 
     private NormalizedNode<?, ?> normalizedNode;
-    private short version;
 
     public ReadDataReply() {
     }
 
-    public ReadDataReply(NormalizedNode<?, ?> normalizedNode) {
+    public ReadDataReply(NormalizedNode<?, ?> normalizedNode, short version) {
+        super(version);
         this.normalizedNode = normalizedNode;
     }
 
@@ -40,20 +39,19 @@ public class ReadDataReply implements VersionedSerializableMessage, Externalizab
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        version = in.readShort();
+        super.readExternal(in);
         normalizedNode = SerializationUtils.deserializeNormalizedNode(in);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(version);
+        super.writeExternal(out);
         SerializationUtils.serializeNormalizedNode(normalizedNode, out);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            version = toVersion;
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             return toSerializableReadDataReply(normalizedNode);
@@ -78,7 +76,8 @@ public class ReadDataReply implements VersionedSerializableMessage, Externalizab
         } else {
             ShardTransactionMessages.ReadDataReply o =
                     (ShardTransactionMessages.ReadDataReply) serializable;
-            return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()));
+            return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java
new file mode 100644 (file)
index 0000000..2a660fa
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Abstract base class for a versioned Externalizable message.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage {
+    private static final long serialVersionUID = 1L;
+
+    private short version;
+
+    public VersionedExternalizableMessage() {
+    }
+
+    public VersionedExternalizableMessage(short version) {
+        this.version = version;
+    }
+
+    public short getVersion() {
+        return version;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        version = in.readShort();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(version);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java
deleted file mode 100644 (file)
index 5c30b10..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-/**
- * Interface for a Serializable message with versioning.
- *
- * @author Thomas Pantelis
- */
-public interface VersionedSerializableMessage {
-    Object toSerializable(short toVersion);
-}
index 989949c88fb0f0c5400ea903119435a6d4ed0940..a4f648b6b3ccb1a99fe1f3c66241f6801c4290bc 100644 (file)
@@ -16,7 +16,11 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class WriteData extends ModifyData implements VersionedSerializableMessage {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class WriteData extends ModifyData {
     private static final long serialVersionUID = 1L;
 
     public static final Class<WriteData> SERIALIZABLE_CLASS = WriteData.class;
@@ -24,14 +28,13 @@ public class WriteData extends ModifyData implements VersionedSerializableMessag
     public WriteData() {
     }
 
-    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        super(path, data);
+    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+        super(path, data, version);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            setVersion(toVersion);
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             // To base or R1 Helium version
@@ -50,7 +53,8 @@ public class WriteData extends ModifyData implements VersionedSerializableMessag
             ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
             Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
                     o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-            return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode());
+            return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
index 8255828819cd494a93fc61dacd32bc48ba55edba..3455571a518f1bf656369a23f2ea202ee53d6271 100644 (file)
@@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
 public class WriteDataReply extends EmptyReply {
     private static final long serialVersionUID = 1L;
 
index f04d00440405deab7e5a8be141fd624fd3c30f35..77f0858d7b1ef9119fb5b2bff9039de0d30e4536 100644 (file)
@@ -17,8 +17,10 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public abstract class AbstractModification implements Modification {
 
     private YangInstanceIdentifier path;
+    private short version;
 
-    protected AbstractModification() {
+    protected AbstractModification(short version) {
+        this.version = version;
     }
 
     protected AbstractModification(YangInstanceIdentifier path) {
@@ -32,4 +34,8 @@ public abstract class AbstractModification implements Modification {
     public YangInstanceIdentifier getPath() {
         return path;
     }
+
+    public short getVersion() {
+        return version;
+    }
 }
index 833f86fb981f1179ce326c4d3703f17c3449aa73..3a63f5b17361a0015ac43ba293bcd54ee29339cb 100644 (file)
@@ -25,6 +25,11 @@ public class DeleteModification extends AbstractModification {
     private static final long serialVersionUID = 1L;
 
     public DeleteModification() {
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public DeleteModification(short version) {
+        super(version);
     }
 
     public DeleteModification(YangInstanceIdentifier path) {
@@ -43,13 +48,11 @@ public class DeleteModification extends AbstractModification {
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        in.readShort();
         setPath(SerializationUtils.deserializePath(in));
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(DataStoreVersions.CURRENT_VERSION);
         SerializationUtils.serializePath(getPath(), out);
     }
 
@@ -66,8 +69,9 @@ public class DeleteModification extends AbstractModification {
         return new DeleteModification(InstanceIdentifierUtils.fromSerializable(o.getPath()));
     }
 
-    public static DeleteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
-        DeleteModification mod = new DeleteModification();
+    public static DeleteModification fromStream(ObjectInput in, short version)
+            throws ClassNotFoundException, IOException {
+        DeleteModification mod = new DeleteModification(version);
         mod.readExternal(in);
         return mod;
     }
index 571443eedd3a89f2ce9d474cbfbd883919a8d7ff..7ba74f4e7ff63b42e82d81ab6e82f7f22b1ede6f 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.modification;
 
 import java.io.IOException;
 import java.io.ObjectInput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
@@ -24,6 +25,11 @@ public class MergeModification extends WriteModification {
     private static final long serialVersionUID = 1L;
 
     public MergeModification() {
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public MergeModification(short version) {
+        super(version);
     }
 
     public MergeModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
@@ -47,8 +53,9 @@ public class MergeModification extends WriteModification {
         return new MergeModification(decoded.getDecodedPath(), decoded.getDecodedNode());
     }
 
-    public static MergeModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
-        MergeModification mod = new MergeModification();
+    public static MergeModification fromStream(ObjectInput in, short version)
+            throws ClassNotFoundException, IOException {
+        MergeModification mod = new MergeModification(version);
         mod.readExternal(in);
         return mod;
     }
index 5d7947b19fc6ddaeafe133ec546dfc879dd07855..b597742319f08a2c04a8b633baf6d525e97dff14 100644 (file)
@@ -27,10 +27,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 public class MutableCompositeModification implements CompositeModification {
     private static final long serialVersionUID = 1L;
 
-    private final List<Modification> modifications;
+    private final List<Modification> modifications = new ArrayList<>();
+    private short version;
 
     public MutableCompositeModification() {
-        modifications = new ArrayList<>();
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public MutableCompositeModification(short version) {
+        this.version = version;
     }
 
     @Override
@@ -45,6 +50,14 @@ public class MutableCompositeModification implements CompositeModification {
         return COMPOSITE;
     }
 
+    public short getVersion() {
+        return version;
+    }
+
+    public void setVersion(short version) {
+        this.version = version;
+    }
+
     /**
      * Add a new Modification to the list of Modifications represented by this
      * composite
@@ -62,7 +75,7 @@ public class MutableCompositeModification implements CompositeModification {
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        in.readShort();
+        version = in.readShort();
 
         int size = in.readInt();
 
@@ -75,15 +88,15 @@ public class MutableCompositeModification implements CompositeModification {
                 byte type = in.readByte();
                 switch(type) {
                 case Modification.WRITE:
-                    modifications.add(WriteModification.fromStream(in));
+                    modifications.add(WriteModification.fromStream(in, version));
                     break;
 
                 case Modification.MERGE:
-                    modifications.add(MergeModification.fromStream(in));
+                    modifications.add(MergeModification.fromStream(in, version));
                     break;
 
                 case Modification.DELETE:
-                    modifications.add(DeleteModification.fromStream(in));
+                    modifications.add(DeleteModification.fromStream(in, version));
                     break;
                 }
             }
@@ -94,7 +107,7 @@ public class MutableCompositeModification implements CompositeModification {
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(DataStoreVersions.CURRENT_VERSION);
+        out.writeShort(version);
 
         out.writeInt(modifications.size());
 
@@ -121,8 +134,7 @@ public class MutableCompositeModification implements CompositeModification {
         builder.setTimeStamp(System.nanoTime());
 
         for (Modification m : modifications) {
-            builder.addModification(
-                    (PersistentMessages.Modification) m.toSerializable());
+            builder.addModification((PersistentMessages.Modification) m.toSerializable());
         }
 
         return builder.build();
index 9c122c9adeef8a14cf05bfa877d38a5cbe310ae2..2fdca5f3792161400bf5e8cbbb0f4e13222bcad5 100644 (file)
@@ -31,6 +31,11 @@ public class WriteModification extends AbstractModification {
     private NormalizedNode<?, ?> data;
 
     public WriteModification() {
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public WriteModification(short version) {
+        super(version);
     }
 
     public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
@@ -54,14 +59,11 @@ public class WriteModification extends AbstractModification {
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        in.readShort(); // version
-
         SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(DataStoreVersions.CURRENT_VERSION);
         SerializationUtils.serializePathAndNode(getPath(), data, out);
     }
 
@@ -81,8 +83,9 @@ public class WriteModification extends AbstractModification {
         return new WriteModification(decoded.getDecodedPath(), decoded.getDecodedNode());
     }
 
-    public static WriteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
-        WriteModification mod = new WriteModification();
+    public static WriteModification fromStream(ObjectInput in, short version)
+            throws ClassNotFoundException, IOException {
+        WriteModification mod = new WriteModification(version);
         mod.readExternal(in);
         return mod;
     }
index 7e8307465b9818d3c886098219065c9276b77e65..75e45b1d4f856b6036674096409d39d72fb51c8b 100644 (file)
@@ -1,12 +1,10 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.osgi.framework.BundleContext;
-
 import scala.concurrent.duration.Duration;
 
 public class DistributedConfigDataStoreProviderModule extends
@@ -68,6 +66,7 @@ public class DistributedConfigDataStoreProviderModule extends
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
                 .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
+                .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
index 0655468531a16fe7a9ad2032a5f56d6f948044e3..f5d46a9a7e2da12054f942c5c7d84297532db1b7 100644 (file)
@@ -1,12 +1,10 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.osgi.framework.BundleContext;
-
 import scala.concurrent.duration.Duration;
 
 public class DistributedOperationalDataStoreProviderModule extends
@@ -68,6 +66,7 @@ public class DistributedOperationalDataStoreProviderModule extends
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
                 .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
+                .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
index e2ee7373d0cfd3c4a68ec98a221fd1a121f4f13c..46918890b65b65f1b86b37194c7f91cda144ea17 100644 (file)
@@ -156,6 +156,15 @@ module distributed-datastore-provider {
                           an operation (eg transaction create).";
          }
 
+         leaf shard-batched-modification-count {
+            default 100;
+            type non-zero-uint32-type;
+            description "The number of transaction modification operations (put, merge, delete) to
+                        batch before sending to the shard transaction actor. Batching improves
+                        performance as less modifications messages are sent to the actor and thus
+                        lessens the chance that the transaction actor's mailbox queue could get full.";
+         }
+
          leaf enable-metric-capture {
             default false;
             type boolean;
index d3a3a8fc2df812b88ffc500c71e17d224189b541..0099b58dfbe92370dffa5a8e5ad0263b55abf8c9 100644 (file)
@@ -32,6 +32,7 @@ public class DatastoreContextTest {
         assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
         assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
         assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT, build.getShardBatchedModificationCount());
     }
 
-}
\ No newline at end of file
+}
index 66fa876277ca97a164a5b433fa3e11ecb50e69a7..4ec035ee3b52308e6390d9a2e4dece6703438318 100644 (file)
@@ -25,6 +25,7 @@ public class DistributedDataStoreTest extends AbstractActorTest {
         schemaContext = TestModel.createTestContext();
 
         doReturn(schemaContext).when(actorContext).getSchemaContext();
+        doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
     }
 
     @Test
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java
new file mode 100644 (file)
index 0000000..e7afe26
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import java.util.concurrent.Semaphore;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+
+/**
+ * Unit tests for OperationCompleter.
+ *
+ * @author Thomas Pantelis
+ */
+public class OperationCompleterTest {
+
+    @Test
+    public void testOnComplete() throws Exception {
+        int permits = 10;
+        Semaphore operationLimiter = new Semaphore(permits);
+        operationLimiter.acquire(permits);
+        int availablePermits = 0;
+
+        OperationCompleter completer = new OperationCompleter(operationLimiter );
+
+        completer.onComplete(null, new DataExistsReply(true));
+        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+        completer.onComplete(null, new DataExistsReply(true));
+        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+        completer.onComplete(null, new IllegalArgumentException());
+        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+        completer.onComplete(null, new BatchedModificationsReply(4));
+        availablePermits += 4;
+        assertEquals("availablePermits", availablePermits, operationLimiter.availablePermits());
+    }
+}
index 58cec67a2d6cce1f30e3cdf41fdaf7b7d23185a7..1d1b08b5f8d57db68e9690cf1045fec5cff10bcc 100644 (file)
@@ -79,8 +79,8 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
-                            DataStoreVersions.BASE_HELIUM_VERSION), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.BASE_HELIUM_VERSION).
+                        toSerializable(), getRef());
 
             expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
 
@@ -153,9 +153,11 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                    DataStoreVersions.BASE_HELIUM_VERSION).toSerializable(), getRef());
 
-            expectMsgClass(duration, WriteDataReply.class);
+            expectMsgClass(duration, WriteDataReply.INSTANCE.toSerializable(
+                    DataStoreVersions.BASE_HELIUM_VERSION).getClass());
 
             // Ready the Tx
 
index 851fb0114b3a64c7d211a5c6375c14c41e715e69..2973d277f5d8a4d4ce4576ba8dd8bc977295a1c8 100644 (file)
@@ -14,10 +14,14 @@ import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
@@ -46,9 +50,11 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
@@ -250,8 +256,8 @@ public class ShardTransactionTest extends AbstractActorTest {
                     "testOnReceiveWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
-                            DataStoreVersions.HELIUM_2_VERSION), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
+                        toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
 
@@ -259,7 +265,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
             // unserialized write
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
@@ -293,8 +299,8 @@ public class ShardTransactionTest extends AbstractActorTest {
                     "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
-                            DataStoreVersions.HELIUM_2_VERSION), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
+                        toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
 
@@ -302,7 +308,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
             //unserialized merge
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
@@ -335,20 +341,73 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
                     "testDeleteData");
 
-            transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(
-                    DataStoreVersions.HELIUM_2_VERSION), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
+                    toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
 
             assertModification(transaction, DeleteModification.class);
 
             //unserialized
-            transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
 
             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveBatchedModifications() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+            final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
+
+            YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+            NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                    withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+            YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+            NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+            YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+            BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+            batched.addModification(new WriteModification(writePath, writeData));
+            batched.addModification(new MergeModification(mergePath, mergeData));
+            batched.addModification(new DeleteModification(deletePath));
+
+            transaction.tell(batched, getRef());
+
+            BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+            assertEquals("getNumBatched", 3, reply.getNumBatched());
+
+            JavaTestKit verification = new JavaTestKit(getSystem());
+            transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
+
+            CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
+                        GetCompositeModificationReply.class).getModification();
+
+            assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
+
+            WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
+            assertEquals("getPath", writePath, write.getPath());
+            assertEquals("getData", writeData, write.getData());
+
+            MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
+            assertEquals("getPath", mergePath, merge.getPath());
+            assertEquals("getData", mergeData, merge.getData());
+
+            DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
+            assertEquals("getPath", deletePath, delete.getPath());
+
+            InOrder inOrder = Mockito.inOrder(mockWriteTx);
+            inOrder.verify(mockWriteTx).write(writePath, writeData);
+            inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
+            inOrder.verify(mockWriteTx).delete(deletePath);
+        }};
+    }
 
     @Test
     public void testOnReceiveReadyTransaction() throws Exception {
@@ -463,8 +522,8 @@ public class ShardTransactionTest extends AbstractActorTest {
                 DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
-        transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(
-                DataStoreVersions.CURRENT_VERSION), ActorRef.noSender());
+        transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
+                toSerializable(), ActorRef.noSender());
     }
 
     @Test
index 23c3a82a38255e9bfff2708583c4370ccb6ccf9a..88ab0dd292b4894f0eac4c8ae782452d9d696471 100644 (file)
@@ -43,6 +43,7 @@ public class TransactionChainProxyTest extends AbstractActorTest{
         actorContext.setSchemaContext(schemaContext);
 
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+        doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext();
     }
 
     @SuppressWarnings("resource")
index fa2f9187d6059f1585a1475531556d8563db3c5a..6573308c12100914badbedc5d0296b90e096a2b7 100644 (file)
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -39,13 +40,18 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -60,6 +66,11 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -71,6 +82,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -102,7 +114,10 @@ public class TransactionProxyTest {
     @Mock
     private ClusterWrapper mockClusterWrapper;
 
-    String memberName = "mock-member";
+    private final String memberName = "mock-member";
+
+    private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
+            shardBatchedModificationCount(1);
 
     @BeforeClass
     public static void setUpClass() throws IOException {
@@ -126,15 +141,13 @@ public class TransactionProxyTest {
 
         schemaContext = TestModel.createTestContext();
 
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
-
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
-        doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+        doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         ShardStrategyFactory.setConfiguration(configuration);
@@ -187,11 +200,15 @@ public class TransactionProxyTest {
     }
 
     private ReadData eqSerializedReadData() {
+        return eqSerializedReadData(TestModel.TEST_PATH);
+    }
+
+    private ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
             @Override
             public boolean matches(Object argument) {
                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+                       ReadData.fromSerializable(argument).getPath().equals(path);
             }
         };
 
@@ -210,23 +227,13 @@ public class TransactionProxyTest {
         return argThat(matcher);
     }
 
-    private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
-        return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
-            final int transactionVersion) {
+    private WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
             @Override
             public boolean matches(Object argument) {
-                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
-                        WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
-                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
-                           ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
-
+                if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) {
                     WriteData obj = WriteData.fromSerializable(argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                           obj.getData().equals(nodeToWrite);
+                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
                 }
 
                 return false;
@@ -236,39 +243,13 @@ public class TransactionProxyTest {
         return argThat(matcher);
     }
 
-    private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
-        ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(argument instanceof WriteData) {
-                    WriteData obj = (WriteData) argument;
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                        obj.getData().equals(nodeToWrite);
-                }
-                return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
-        return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
-            final int transactionVersion) {
+    private MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
             @Override
             public boolean matches(Object argument) {
-                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
-                        MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
-                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
-                           ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
-
+                if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) {
                     MergeData obj = MergeData.fromSerializable(argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                           obj.getData().equals(nodeToWrite);
+                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
                 }
 
                 return false;
@@ -278,41 +259,12 @@ public class TransactionProxyTest {
         return argThat(matcher);
     }
 
-    private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
-        ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(argument instanceof MergeData) {
-                    MergeData obj = ((MergeData) argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                        obj.getData().equals(nodeToWrite);
-                }
-
-               return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private DeleteData eqSerializedDeleteData() {
-        ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-        private DeleteData eqDeleteData() {
+    private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
             @Override
             public boolean matches(Object argument) {
-                return argument instanceof DeleteData &&
-                    ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
+                return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) &&
+                       DeleteData.fromSerializable(argument).getPath().equals(expPath);
             }
         };
 
@@ -329,7 +281,7 @@ public class TransactionProxyTest {
 
     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
             short transactionVersion) {
-        return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+        return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable());
     }
 
     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
@@ -337,7 +289,7 @@ public class TransactionProxyTest {
     }
 
     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(data));
+        return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
     }
 
     private Future<Object> dataExistsSerializedReply(boolean exists) {
@@ -348,48 +300,41 @@ public class TransactionProxyTest {
         return Futures.successful(new DataExistsReply(exists));
     }
 
-    private Future<Object> writeSerializedDataReply(short version) {
-        return Futures.successful(new WriteDataReply().toSerializable(version));
-    }
-
-    private Future<Object> writeSerializedDataReply() {
-        return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private Future<WriteDataReply> writeDataReply() {
-        return Futures.successful(new WriteDataReply());
-    }
-
-    private Future<Object> mergeSerializedDataReply(short version) {
-        return Futures.successful(new MergeDataReply().toSerializable(version));
-    }
-
-    private Future<Object> mergeSerializedDataReply() {
-        return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+    private Future<BatchedModificationsReply> batchedModificationsReply(int count) {
+        return Futures.successful(new BatchedModificationsReply(count));
     }
 
     private Future<Object> incompleteFuture(){
         return mock(Future.class);
     }
 
-    private Future<MergeDataReply> mergeDataReply() {
-        return Futures.successful(new MergeDataReply());
+    private ActorSelection actorSelection(ActorRef actorRef) {
+        return getSystem().actorSelection(actorRef.path());
+    }
+
+    private void expectBatchedModifications(ActorRef actorRef, int count) {
+        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
-    private Future<Object> deleteSerializedDataReply(short version) {
-        return Futures.successful(new DeleteDataReply().toSerializable(version));
+    private void expectBatchedModifications(int count) {
+        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
     }
 
-    private Future<Object> deleteSerializedDataReply() {
-        return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+    private void expectIncompleteBatchedModifications() {
+        doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
     }
 
-    private Future<DeleteDataReply> deleteDataReply() {
-        return Futures.successful(new DeleteDataReply());
+    private void expectReadyTransaction(ActorRef actorRef) {
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
     }
 
-    private ActorSelection actorSelection(ActorRef actorRef) {
-        return getSystem().actorSelection(actorRef.path());
+    private void expectFailedBatchedModifications(ActorRef actorRef) {
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
     private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
@@ -446,8 +391,7 @@ public class TransactionProxyTest {
     public void testRead() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
@@ -476,8 +420,7 @@ public class TransactionProxyTest {
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
     }
@@ -489,8 +432,7 @@ public class TransactionProxyTest {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
     }
@@ -541,21 +483,19 @@ public class TransactionProxyTest {
 
     @Test(expected = TestException.class)
     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
+                when(mockActorContext).getDatastoreContext();
+
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        expectFailedBatchedModifications(actorRef);
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -575,14 +515,12 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
 
@@ -590,16 +528,19 @@ public class TransactionProxyTest {
                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
-
         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+
+        InOrder inOrder = Mockito.inOrder(mockActorContext);
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
     }
 
     @Test(expected=IllegalStateException.class)
     public void testReadPreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
-
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
         transactionProxy.read(TestModel.TEST_PATH);
     }
 
@@ -625,8 +566,7 @@ public class TransactionProxyTest {
     public void testExists() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
@@ -673,23 +613,21 @@ public class TransactionProxyTest {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
     }
 
     @Test(expected = TestException.class)
     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
+                when(mockActorContext).getDatastoreContext();
+
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        expectFailedBatchedModifications(actorRef);
 
         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
@@ -715,28 +653,30 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", true, exists);
+
+        InOrder inOrder = Mockito.inOrder(mockActorContext);
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedDataExists());
     }
 
     @Test(expected=IllegalStateException.class)
     public void testExistsPreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
-
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
         transactionProxy.exists(TestModel.TEST_PATH);
     }
 
@@ -757,7 +697,7 @@ public class TransactionProxyTest {
                     // Expected
                 }
             } else {
-                assertEquals("Recording operation Future result type", expResultType,
+                assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
             }
         }
@@ -769,19 +709,20 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     @Test
@@ -796,10 +737,10 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
@@ -833,33 +774,28 @@ public class TransactionProxyTest {
             throw caughtEx.get();
         }
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     @Test(expected=IllegalStateException.class)
     public void testWritePreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
-
-        transactionProxy.write(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
     }
 
     @Test(expected=IllegalStateException.class)
     public void testWriteAfterReadyPreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.ready();
 
-        transactionProxy.write(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
     }
 
     @Test
@@ -868,37 +804,40 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     @Test
     public void testDelete() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
-        doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                DeleteDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
@@ -935,14 +874,10 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -955,9 +890,12 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.class);
+                BatchedModificationsReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
+                isA(BatchedModifications.class));
     }
 
     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
@@ -969,14 +907,16 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+        doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
 
-        doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
+        doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+        doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
+
+        expectReadyTransaction(actorRef);
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
@@ -993,6 +933,8 @@ public class TransactionProxyTest {
 
         transactionProxy.merge(TestModel.TEST_PATH, testNode);
 
+        transactionProxy.delete(TestModel.TEST_PATH);
+
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
@@ -1000,7 +942,8 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
+                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class,
+                ShardTransactionMessages.DeleteDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
@@ -1029,21 +972,13 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        expectFailedBatchedModifications(actorRef);
 
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+        expectReadyTransaction(actorRef);
 
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
-
-        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1055,8 +990,7 @@ public class TransactionProxyTest {
 
         verifyCohortFutures(proxy, TestException.class);
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.class, TestException.class);
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
     }
 
     @Test
@@ -1065,15 +999,13 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)),
                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1084,7 +1016,7 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.class);
+                BatchedModificationsReply.class);
 
         verifyCohortFutures(proxy, TestException.class);
     }
@@ -1095,8 +1027,7 @@ public class TransactionProxyTest {
         doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
                 mockActorContext).findPrimaryShardAsync(anyString());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -1121,15 +1052,13 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)),
                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1160,8 +1089,7 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -1197,9 +1125,7 @@ public class TransactionProxyTest {
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-            .setTransactionId("txn-1")
-            .setTransactionActorPath(actorPath)
-            .build();
+            .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
@@ -1240,7 +1166,7 @@ public class TransactionProxyTest {
     }
 
     @Test
-    public void testLocalTxActorWrite() throws Exception {
+    public void testLocalTxActorReady() throws Exception {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
@@ -1251,48 +1177,26 @@ public class TransactionProxyTest {
             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-            .setTransactionId("txn-1")
-            .setTransactionActorPath(actorPath)
-            .build();
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+            setTransactionId("txn-1").setTransactionActorPath(actorPath).
+            setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
-        executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+            executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                 eqCreateTransaction(memberName, WRITE_ONLY));
 
         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
 
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqWriteData(nodeToWrite));
+        doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        verify(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqWriteData(nodeToWrite));
-
-        //testing local merge
-        doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqMergeData(nodeToWrite));
-
-        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
-        verify(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqMergeData(nodeToWrite));
-
-
-        //testing local delete
-        doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqDeleteData());
 
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-            WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
+                BatchedModificationsReply.class);
 
         // testing ready
         doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
@@ -1333,10 +1237,9 @@ public class TransactionProxyTest {
         }
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-                .setTransactionId("txn-1")
-                .setTransactionActorPath(actorPath)
-                .build();
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+                setTransactionId("txn-1").setTransactionActorPath(actorPath).
+                setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
@@ -1352,9 +1255,9 @@ public class TransactionProxyTest {
 
         long end = System.nanoTime();
 
-        Assert.assertTrue(String.format("took less time than expected %s was %s",
-                TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
-                (end-start)), (end - start) > TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+                expected, (end-start)), (end - start) > expected);
 
     }
 
@@ -1380,10 +1283,9 @@ public class TransactionProxyTest {
         }
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-                .setTransactionId("txn-1")
-                .setTransactionActorPath(actorPath)
-                .build();
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+                setTransactionId("txn-1").setTransactionActorPath(actorPath).
+                setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
@@ -1399,9 +1301,9 @@ public class TransactionProxyTest {
 
         long end = System.nanoTime();
 
-        Assert.assertTrue(String.format("took more time than expected %s was %s",
-                TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
-                (end-start)), (end - start) <= TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+                expected, (end-start)), (end - start) <= expected);
     }
 
     public void testWriteThrottling(boolean shardFound){
@@ -1411,8 +1313,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectBatchedModifications(2);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1428,15 +1329,13 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectIncompleteBatchedModifications();
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
             }
         });
-
     }
 
     @Test
@@ -1447,8 +1346,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectBatchedModifications(2);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1466,15 +1364,13 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+                expectBatchedModifications(2);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
             }
         });
-
     }
 
     @Test
@@ -1485,8 +1381,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+                expectIncompleteBatchedModifications();
 
                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
 
@@ -1503,8 +1398,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+                expectBatchedModifications(2);
 
                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
 
@@ -1520,8 +1414,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+                expectBatchedModifications(2);
 
                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
 
@@ -1537,8 +1430,7 @@ public class TransactionProxyTest {
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqDeleteData());
+                expectIncompleteBatchedModifications();
 
                 transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -1554,8 +1446,7 @@ public class TransactionProxyTest {
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqDeleteData());
+                expectBatchedModifications(2);
 
                 transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -1569,8 +1460,7 @@ public class TransactionProxyTest {
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
-                doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqDeleteData());
+                expectBatchedModifications(2);
 
                 transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -1688,8 +1578,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectBatchedModifications(1);
 
                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
                         any(ActorSelection.class), any(ReadyTransaction.class));
@@ -1710,11 +1599,7 @@ public class TransactionProxyTest {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
 
-                doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
-
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(carsNode));
+                expectBatchedModifications(2);
 
                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
                         any(ActorSelection.class), any(ReadyTransaction.class));
@@ -1727,4 +1612,203 @@ public class TransactionProxyTest {
             }
         }, 2, true);
     }
+
+    @Test
+    public void testModificationOperationBatching() throws Throwable {
+        int shardBatchedModificationCount = 3;
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
+                when(mockActorContext).getDatastoreContext();
+
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        expectBatchedModifications(actorRef, shardBatchedModificationCount);
+
+        expectReadyTransaction(actorRef);
+
+        YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
+        NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+        YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
+        NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+        YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
+        YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.write(writePath1, writeNode1);
+        transactionProxy.write(writePath2, writeNode2);
+        transactionProxy.delete(deletePath1);
+        transactionProxy.merge(mergePath1, mergeNode1);
+        transactionProxy.merge(mergePath2, mergeNode2);
+        transactionProxy.write(writePath3, writeNode3);
+        transactionProxy.merge(mergePath3, mergeNode3);
+        transactionProxy.delete(deletePath2);
+
+        // This sends the last batch.
+        transactionProxy.ready();
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+                new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
+
+        verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+                new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
+
+        verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
+                new DeleteModification(deletePath2));
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+    }
+
+    @Test
+    public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
+        int shardBatchedModificationCount = 10;
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
+                when(mockActorContext).getDatastoreContext();
+
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        expectBatchedModifications(actorRef, shardBatchedModificationCount);
+
+        YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
+        NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+        YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
+
+        doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+
+        doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+
+        doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedDataExists());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.write(writePath1, writeNode1);
+        transactionProxy.write(writePath2, writeNode2);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
+                get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
+
+        transactionProxy.merge(mergePath1, mergeNode1);
+        transactionProxy.merge(mergePath2, mergeNode2);
+
+        readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
+
+        transactionProxy.delete(deletePath);
+
+        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+        assertEquals("Exists response", true, exists);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+                new WriteModification(writePath2, writeNode2));
+
+        verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+                new MergeModification(mergePath2, mergeNode2));
+
+        verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
+
+        InOrder inOrder = Mockito.inOrder(mockActorContext);
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedDataExists());
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+    }
+
+    private List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
+        ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
+                ArgumentCaptor.forClass(BatchedModifications.class);
+        verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
+                eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
+
+        List<BatchedModifications> batchedModifications = filterCaptured(
+                batchedModificationsCaptor, BatchedModifications.class);
+        return batchedModifications;
+    }
+
+    private <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
+        List<T> captured = new ArrayList<>();
+        for(T c: captor.getAllValues()) {
+            if(type.isInstance(c)) {
+                captured.add(c);
+            }
+        }
+
+        return captured;
+    }
+
+    private void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), expected);
+    }
+
+    private void verifyBatchedModifications(Object message, Modification... expected) {
+        assertEquals("Message type", BatchedModifications.class, message.getClass());
+        BatchedModifications batchedModifications = (BatchedModifications)message;
+        assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+        for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
+            Modification actual = batchedModifications.getModifications().get(i);
+            assertEquals("Modification type", expected[i].getClass(), actual.getClass());
+            assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
+                    ((AbstractModification)actual).getPath());
+            if(actual instanceof WriteModification) {
+                assertEquals("getData", ((WriteModification)expected[i]).getData(),
+                        ((WriteModification)actual).getData());
+            }
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java
new file mode 100644 (file)
index 0000000..15d2eea
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for BatchedModifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModificationsTest {
+
+    @Test
+    public void testSerialization() {
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+        YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+        BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(writePath, writeData));
+        batched.addModification(new MergeModification(mergePath, mergeData));
+        batched.addModification(new DeleteModification(deletePath));
+
+        BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
+                (Serializable) batched.toSerializable());
+
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+
+        assertEquals("getModifications size", 3, clone.getModifications().size());
+
+        WriteModification write = (WriteModification)clone.getModifications().get(0);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion());
+        assertEquals("getPath", writePath, write.getPath());
+        assertEquals("getData", writeData, write.getData());
+
+        MergeModification merge = (MergeModification)clone.getModifications().get(1);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion());
+        assertEquals("getPath", mergePath, merge.getPath());
+        assertEquals("getData", mergeData, merge.getData());
+
+        DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
+        assertEquals("getPath", deletePath, delete.getPath());
+    }
+
+    @Test
+    public void testBatchedModificationsReplySerialization() {
+        BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
+                (Serializable) new BatchedModificationsReply(100).toSerializable());
+        assertEquals("getNumBatched", 100, clone.getNumBatched());
+    }
+}
index e950b78ab7d609a6b0aac6e34431cbb5a3f36584..97bade152e0484dd832aacf4c962cc58b6c77d11 100644 (file)
@@ -22,21 +22,22 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  *
  * @author Thomas Pantelis
  */
+@Deprecated
 public class DeleteDataTest {
 
     @Test
     public void testSerialization() {
         YangInstanceIdentifier path = TestModel.TEST_PATH;
 
-        DeleteData expected = new DeleteData(path);
+        DeleteData expected = new DeleteData(path, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", DeleteData.class, serialized.getClass());
         assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)serialized).getVersion());
 
         Object clone = SerializationUtils.clone((Serializable) serialized);
-        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)clone).getVersion());
         DeleteData actual = DeleteData.fromSerializable(clone);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getPath", expected.getPath(), actual.getPath());
     }
 
@@ -58,9 +59,9 @@ public class DeleteDataTest {
     public void testSerializationWithHeliumR1Version() throws Exception {
         YangInstanceIdentifier path = TestModel.TEST_PATH;
 
-        DeleteData expected = new DeleteData(path);
+        DeleteData expected = new DeleteData(path, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.DeleteData.class, serialized.getClass());
 
         DeleteData actual = DeleteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
index 5b40afdff8288ff714cc4f9b4a398e2f2724369b..011d22798e628ee4800f980a4718667781ffdb16 100644 (file)
@@ -14,6 +14,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
+@Deprecated
 public class MergeDataTest {
 
     @Test
@@ -23,15 +24,15 @@ public class MergeDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        MergeData expected = new MergeData(path, data);
+        MergeData expected = new MergeData(path, data, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", MergeData.class, serialized.getClass());
         assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion());
 
         Object clone = SerializationUtils.clone((Serializable) serialized);
-        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion());
         MergeData actual = MergeData.fromSerializable(clone);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getPath", expected.getPath(), actual.getPath());
         assertEquals("getData", expected.getData(), actual.getData());
     }
@@ -58,9 +59,9 @@ public class MergeDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        MergeData expected = new MergeData(path, data);
+        MergeData expected = new MergeData(path, data, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass());
 
         MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
index 8ce73296c14cd0af90491ae4f87e99fa85637c5e..7ad45a61702607cdaf03a61b23573575d54830cd 100644 (file)
@@ -32,13 +32,14 @@ public class ReadDataReplyTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        ReadDataReply expected = new ReadDataReply(data);
+        ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ReadDataReply.class, serialized.getClass());
 
         ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
                 (Serializable) serialized));
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
     }
 
@@ -60,9 +61,9 @@ public class ReadDataReplyTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        ReadDataReply expected = new ReadDataReply(data);
+        ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass());
 
         ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
index 90a76f229e1ddd045c084e56b1b854a518b43a92..8148c9ca9528fc709ad1ecaf3febbda265bc033d 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableCo
  *
  * @author Thomas Pantelis
  */
+@Deprecated
 public class WriteDataTest {
 
     @Test
@@ -35,15 +36,15 @@ public class WriteDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteData expected = new WriteData(path, data);
+        WriteData expected = new WriteData(path, data, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", WriteData.class, serialized.getClass());
         assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion());
 
         Object clone = SerializationUtils.clone((Serializable) serialized);
-        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion());
         WriteData actual = WriteData.fromSerializable(clone);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getPath", expected.getPath(), actual.getPath());
         assertEquals("getData", expected.getData(), actual.getData());
     }
@@ -69,9 +70,9 @@ public class WriteDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteData expected = new WriteData(path, data);
+        WriteData expected = new WriteData(path, data, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass());
 
         WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
index b9d44b2586f75a34b163322f9888b94419dd7cee..e2acaa8d10789d2d1e1f2b5a6f127b5a1c7a3d9a 100644 (file)
@@ -7,6 +7,7 @@ import com.google.common.base.Stopwatch;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -53,17 +54,22 @@ public class MutableCompositeModificationTest extends AbstractModificationTest {
 
         MutableCompositeModification clone = (MutableCompositeModification) SerializationUtils.clone(compositeModification);
 
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+
         assertEquals("getModifications size", 3, clone.getModifications().size());
 
         WriteModification write = (WriteModification)clone.getModifications().get(0);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion());
         assertEquals("getPath", writePath, write.getPath());
         assertEquals("getData", writeData, write.getData());
 
         MergeModification merge = (MergeModification)clone.getModifications().get(1);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion());
         assertEquals("getPath", mergePath, merge.getPath());
         assertEquals("getData", mergeData, merge.getData());
 
         DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
         assertEquals("getPath", deletePath, delete.getPath());
     }
 
index 67fa0960cbcb96cc2f617148aab2c7a7a7bab71d..9761ed8615a763df20ec6763defc69b55395a120 100644 (file)
@@ -31,7 +31,10 @@ public class TestModel {
   private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
 
   public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
-  public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build();
+  public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+          node(OUTER_LIST_QNAME).build();
+  public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+          node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
   public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
   public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");