Merge "BUG 2412 - restconf @GET getModule(uri) method migration"
authorTony Tkacik <ttkacik@cisco.com>
Wed, 25 Feb 2015 09:28:42 +0000 (09:28 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 25 Feb 2015 09:28:42 +0000 (09:28 +0000)
88 files changed:
features/mdsal/src/main/resources/features.xml
opendaylight/adsal/features/nsf/src/main/resources/features.xml
opendaylight/md-sal/sal-clustering-config/pom.xml
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/NormalizedNodeJsonBodyWriter.java
opendaylight/networkconfiguration/neutron/northbound/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/INeutronRequest.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronFirewallAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronFirewallCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronFirewallPolicyAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronFirewallPolicyCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronFirewallRuleAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronFirewallRuleCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronFloatingIPAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronFloatingIPCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerHealthMonitorAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerHealthMonitorCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerListenerAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerListenerCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerPoolAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerPoolCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerPoolMemberAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronLoadBalancerPoolMemberCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronNetworkAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronNetworkCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronObject.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronPortAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronPortCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronRouterAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronRouterCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronSecurityGroupAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronSecurityGroupCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronSecurityRuleAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronSecurityRuleCRUD.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronSubnetAware.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/INeutronSubnetCRUD.java

index 4b9f8c2288cb8db0e509e5b13774867ed1c62af8..bec365cda09b50a307d34959e45f1980ca8c9862 100644 (file)
         <configfile finalname="configuration/initial/akka.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/akkaconf</configfile>
         <configfile finalname="configuration/initial/module-shards.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleshardconf</configfile>
         <configfile finalname="configuration/initial/modules.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleconf</configfile>
+        <configfile finalname="etc/org.opendaylight.controller.cluster.datastore.cfg">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/cfg/datastore</configfile>
     </feature>
 
     <feature name='odl-clustering-test-app' version='${project.version}'>
index e8f7bc1e5c8b9522c16d8851ae26393bc4af2225..692faa746e1fdfcc766032697584cefe4ac3808e 100644 (file)
         -->
     </feature>
 
+    <feature name="odl-nsf-service" description="OpenDaylight :: NSF :: Network Service Functions in Controller" version="${project.version}">
+        <feature version="${sal.version}">odl-adsal-all</feature>
+        <feature version="${project.version}">odl-nsf-controller-managers</feature>
+        <feature version="${project.version}">odl-adsal-controller-northbound</feature>
+    </feature>
+
     <feature name="odl-nsf-managers" description="OpenDaylight :: AD-SAL :: Network Service Functions" version="${project.version}">
         <feature version="${commons.opendaylight.version}">odl-base-all</feature>
         <feature version="${sal.version}">odl-adsal-all</feature>
         <bundle>mvn:org.opendaylight.controller/routing.dijkstra_implementation/${routing.dijkstra_implementation.version}</bundle>
     </feature>
 
+    <feature name="odl-nsf-controller-managers" description="OpenDaylight :: AD-SAL :: Network Service Functions in Controller" version="${project.version}">
+        <feature version="${commons.opendaylight.version}">odl-base-all</feature>
+        <feature version="${sal.version}">odl-adsal-all</feature>
+        <bundle>mvn:org.opendaylight.controller/usermanager/${usermanager.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/usermanager.implementation/${usermanager.version}</bundle>
+
+        <bundle>mvn:org.opendaylight.controller/appauth/${appauth.version}</bundle>
+
+        <bundle>mvn:org.opendaylight.controller/connectionmanager/${connectionmanager.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/connectionmanager.implementation/${connectionmanager.version}</bundle>
+
+        <bundle>mvn:org.opendaylight.controller/containermanager/${containermanager.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/containermanager.implementation/${containermanager.version}</bundle>
+
+        <bundle>mvn:org.opendaylight.controller/statisticsmanager/${statisticsmanager.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/statisticsmanager.implementation/${statisticsmanager.implementation.version}</bundle>
+
+        <bundle>mvn:org.opendaylight.controller/switchmanager/${switchmanager.api.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/switchmanager.implementation/${switchmanager.implementation.version}</bundle>
+
+        <bundle>mvn:org.opendaylight.controller/forwardingrulesmanager/${forwardingrulesmanager.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/forwardingrulesmanager.implementation/${forwardingrulesmanager.implementation.version}</bundle>
+
+        <bundle>mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}</bundle>
+
+        <bundle>mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}</bundle>
+
+        <bundle>mvn:org.opendaylight.controller/forwarding.staticrouting/${forwarding.staticrouting}</bundle>
+
+        <bundle>mvn:org.opendaylight.controller.thirdparty/net.sf.jung2/2.0.1</bundle>
+        <bundle>mvn:org.opendaylight.controller/routing.dijkstra_implementation/${routing.dijkstra_implementation.version}</bundle>
+    </feature>
+
     <feature name="odl-adsal-northbound" description="OpenDaylight :: AD-SAL :: Northbound APIs" version="${project.version}">
         <feature version="${commons.opendaylight.version}">odl-base-all</feature>
         <feature version="${project.version}">odl-nsf-managers</feature>
         <bundle>mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}</bundle>
     </feature>
+
+    <feature name="odl-adsal-controller-northbound" description="OpenDaylight :: AD-SAL :: Northbound APIs in Controller" version="${project.version}">
+        <feature version="${commons.opendaylight.version}">odl-base-all</feature>
+        <feature version="${project.version}">odl-nsf-managers</feature>
+        <bundle>mvn:org.ow2.asm/asm-all/${asm.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/bundlescanner/${bundlescanner.api.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/bundlescanner.implementation/${bundlescanner.implementation.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/commons.northbound/${northbound.commons.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/connectionmanager.northbound/${connectionmanager.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/flowprogrammer.northbound/${flowprogrammer.northbound.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/hosttracker.northbound/${hosttracker.northbound.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/networkconfig.bridgedomain.northbound/${networkconfig.bridgedomain.northbound.version}</bundle>
+        <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/${eclipse.persistence.version}</bundle>
+        <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.core/${eclipse.persistence.version}</bundle>
+        <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/${eclipse.persistence.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/forwarding.staticrouting.northbound/${forwarding.staticrouting.northbound.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/statistics.northbound/${statistics.northbound.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/subnets.northbound/${subnets.northbound.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/switchmanager.northbound/${switchmanager.northbound.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}</bundle>
+    </feature>
 </features>
index 1c018cade1b01f82a3fa7a8d550ab2c10ed080e5..57da717daf82b46bac1185f6ad537f3288034792 100644 (file)
                   <type>xml</type>
                   <classifier>moduleconf</classifier>
                 </artifact>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/datastore.cfg</file>
+                  <type>cfg</type>
+                  <classifier>datastore</classifier>
+                </artifact>
               </artifacts>
             </configuration>
           </execution>
diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
new file mode 100644 (file)
index 0000000..8e03223
--- /dev/null
@@ -0,0 +1,75 @@
+# This file specifies property settings for the clustered data store to control its behavior. A
+# property may be applied to every data store type ("config" and "operational") or can be customized
+# differently for each data store type by prefixing the data store type + '.'. For example, specifying
+# the "shard-election-timeout-factor" property would be applied to both data stores whereas specifying
+# "operational.shard-election-timeout-factor" would only apply to the "operational" data store. Similarly,
+# specifying "config.shard-election-timeout-factor" would only apply to the "config" data store.
+
+# The multiplication factor to be used to determine shard election timeout. The shard election timeout
+# is determined by multiplying shardHeartbeatIntervalInMillis with the shardElectionTimeoutFactor.
+#shard-election-timeout-factor=2
+
+# The interval at which a shard will send a heart beat message to its remote shard.
+#shard-heartbeat-interval-in-millis=500
+
+# The maximum amount of time to wait for a shard to elect a leader before failing an operation (eg transaction create).
+#shard-leader-election-timeout-in-seconds=30
+
+# Enable or disable data persistence.
+#persistent=true
+
+# Disable persistence for the operational data store by default.
+operational.persistent=false
+
+# The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.
+#shard-transaction-idle-timeout-in-minutes=10
+
+# The maximum amount of time a shard transaction three-phase commit can be idle without receiving the 
+# next messages before it aborts the transaction.
+#shard-transaction-commit-timeout-in-seconds=30
+
+# The maximum allowed capacity for each shard's transaction commit queue.
+#shard-transaction-commit-queue-capacity=20000
+
+# The maximum amount of time to wait for a shard to initialize from persistence on startup before 
+# failing an operation (eg transaction create and change listener registration).
+#shard-initialization-timeout-in-seconds=300
+
+# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
+#shard-journal-recovery-log-batch-size=5000
+
+# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
+#shard-snapshot-batch-count=20000
+
+# The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken.
+#shard-snapshot-data-threshold-percentage=12
+
+# The interval at which the leader of the shard will check if its majority followers are active and 
+# term itself as isolated.
+#shard-isolated-leader-check-interval-in-millis=5000
+
+# The number of transaction modification operations (put, merge, delete) to batch before sending to the 
+# shard transaction actor. Batching improves performance as less modifications messages are sent to the 
+# actor and thus lessens the chance that the transaction actor's mailbox queue could get full.
+#shard-batched-modification-count=100
+
+# The maximum amount of time for akka operations (remote or local) to complete before failing.
+#operation-timeout-in-seconds=5
+
+# The initial number of transactions per second that are allowed before the data store should begin 
+# applying back pressure. This number is only used as an initial guidance, subsequently the datastore 
+# measures the latency for a commit and auto-adjusts the rate limit.
+#transaction-creation-initial-rate-limit=100
+
+# The maximum thread pool size for each shard's data store data change notification executor.
+#max-shard-data-change-executor-pool-size=20
+
+# The maximum queue size for each shard's data store data change notification executor.
+#max-shard-data-change-executor-queue-size=1000
+
+# The maximum queue size for each shard's data store data change listener.
+#max-shard-data-change-listener-queue-size=1000
+
+# The maximum queue size for each shard's data store executor.
+#max-shard-data-store-executor-queue-size=5000
+
index cee781fb88e535a04251f66b948e1d1123169a5c..7f8a4e779d0446c769f5b096b1cbb0aace8b1370 100644 (file)
@@ -42,6 +42,7 @@ public class DatastoreContext {
     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100;
 
     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
@@ -54,19 +55,48 @@ public class DatastoreContext {
     private boolean persistent = DEFAULT_PERSISTENT;
     private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
-    private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+    private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+    private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
 
-    private DatastoreContext(){
+    private DatastoreContext() {
         setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
         setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
         setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
+        setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
+    }
+
+    private DatastoreContext(DatastoreContext other) {
+        this.dataStoreProperties = other.dataStoreProperties;
+        this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
+        this.operationTimeoutInSeconds = other.operationTimeoutInSeconds;
+        this.dataStoreMXBeanType = other.dataStoreMXBeanType;
+        this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds;
+        this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
+        this.shardInitializationTimeout = other.shardInitializationTimeout;
+        this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
+        this.persistent = other.persistent;
+        this.configurationReader = other.configurationReader;
+        this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
+        this.dataStoreType = other.dataStoreType;
+        this.shardBatchedModificationCount = other.shardBatchedModificationCount;
+
+        setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
+        setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
+        setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
+        setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
+        setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
+        setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
     }
 
     public static Builder newBuilder() {
-        return new Builder();
+        return new Builder(new DatastoreContext());
+    }
+
+    public static Builder newBuilderFrom(DatastoreContext context) {
+        return new Builder(new DatastoreContext(context));
     }
 
     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
@@ -148,18 +178,60 @@ public class DatastoreContext {
         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
     }
 
-    private void setSnapshotBatchCount(int shardSnapshotBatchCount) {
+    private void setSnapshotBatchCount(long shardSnapshotBatchCount) {
         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
+    public int getShardBatchedModificationCount() {
+        return shardBatchedModificationCount;
+    }
+
     public static class Builder {
-        private DatastoreContext datastoreContext = new DatastoreContext();
+        private final DatastoreContext datastoreContext;
+        private int maxShardDataChangeExecutorPoolSize =
+                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
+        private int maxShardDataChangeExecutorQueueSize =
+                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE;
+        private int maxShardDataChangeListenerQueueSize =
+                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE;
+        private int maxShardDataStoreExecutorQueueSize =
+                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
+
+        private Builder(DatastoreContext datastoreContext) {
+            this.datastoreContext = datastoreContext;
+
+            if(datastoreContext.getDataStoreProperties() != null) {
+                maxShardDataChangeExecutorPoolSize =
+                        datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
+                maxShardDataChangeExecutorQueueSize =
+                        datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
+                maxShardDataChangeListenerQueueSize =
+                        datastoreContext.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
+                maxShardDataStoreExecutorQueueSize =
+                        datastoreContext.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
+            }
+        }
+
+        public Builder boundedMailboxCapacity(int boundedMailboxCapacity) {
+            // TODO - this is defined in the yang DataStoreProperties but not currently used.
+            return this;
+        }
 
-        public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
-            datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+        public Builder enableMetricCapture(boolean enableMetricCapture) {
+            // TODO - this is defined in the yang DataStoreProperties but not currently used.
             return this;
         }
 
+
+        public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) {
+            datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
+            return this;
+        }
+
+        public Builder shardTransactionIdleTimeoutInMinutes(long timeout) {
+            return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
+        }
+
         public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
             datastoreContext.operationTimeoutInSeconds = operationTimeoutInSeconds;
             return this;
@@ -170,11 +242,6 @@ public class DatastoreContext {
             return this;
         }
 
-        public Builder dataStoreProperties(InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
-            datastoreContext.dataStoreProperties = dataStoreProperties;
-            return this;
-        }
-
         public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
             return this;
@@ -210,11 +277,19 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder shardInitializationTimeoutInSeconds(long timeout) {
+            return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
+        }
+
         public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
             return this;
         }
 
+        public Builder shardLeaderElectionTimeoutInSeconds(long timeout) {
+            return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
+        }
+
         public Builder configurationReader(ConfigurationReader configurationReader){
             datastoreContext.configurationReader = configurationReader;
             return this;
@@ -246,7 +321,35 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
+            datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
+            return this;
+        }
+
+        public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
+            this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
+            return this;
+        }
+
+        public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) {
+            this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
+            return this;
+        }
+
+        public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) {
+            this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
+            return this;
+        }
+
+        public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) {
+            this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
+            return this;
+        }
+
         public DatastoreContext build() {
+            datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
+                    maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
+                    maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
             return datastoreContext;
         }
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlay.java
new file mode 100644 (file)
index 0000000..20ee555
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that overlays DatastoreContext settings with settings obtained from an OSGi Config Admin
+ * service.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreContextConfigAdminOverlay implements AutoCloseable {
+    public static final String CONFIG_ID = "org.opendaylight.controller.cluster.datastore";
+
+    private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextConfigAdminOverlay.class);
+
+    private final DatastoreContextIntrospector introspector;
+    private final BundleContext bundleContext;
+
+    public DatastoreContextConfigAdminOverlay(DatastoreContextIntrospector introspector, BundleContext bundleContext) {
+        this.introspector = introspector;
+        this.bundleContext = bundleContext;
+
+        ServiceReference<ConfigurationAdmin> configAdminServiceReference =
+                bundleContext.getServiceReference(ConfigurationAdmin.class);
+        if(configAdminServiceReference == null) {
+            LOG.warn("No ConfigurationAdmin service found");
+        } else {
+            overlaySettings(configAdminServiceReference);
+        }
+    }
+
+    private void overlaySettings(ServiceReference<ConfigurationAdmin> configAdminServiceReference) {
+        try {
+            ConfigurationAdmin configAdmin = bundleContext.getService(configAdminServiceReference);
+
+            Configuration config = configAdmin.getConfiguration(CONFIG_ID);
+            if(config != null) {
+                Dictionary<String, Object> properties = config.getProperties();
+
+                LOG.debug("Overlaying settings: {}", properties);
+
+                introspector.update(properties);
+            } else {
+                LOG.debug("No Configuration found for {}", CONFIG_ID);
+            }
+        } catch (IOException e) {
+            LOG.error("Error obtaining Configuration for pid {}", CONFIG_ID, e);
+        } catch(IllegalStateException e) {
+            // Ignore - indicates the bundleContext has been closed.
+        } finally {
+            try {
+                bundleContext.ungetService(configAdminServiceReference);
+            } catch (Exception e) {
+                LOG.debug("Error from ungetService", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java
new file mode 100644 (file)
index 0000000..3ca6421
--- /dev/null
@@ -0,0 +1,315 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.primitives.Primitives;
+import java.beans.BeanInfo;
+import java.beans.ConstructorProperties;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.MethodDescriptor;
+import java.beans.PropertyDescriptor;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.text.WordUtils;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Introspects on a DatastoreContext instance to set its properties via reflection.
+ * i
+ * @author Thomas Pantelis
+ */
+public class DatastoreContextIntrospector {
+    private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextIntrospector.class);
+
+    private static final Map<String, Class<?>> dataStorePropTypes = new HashMap<>();
+
+    private static final Map<Class<?>, Constructor<?>> constructors = new HashMap<>();
+
+    private static final Map<Class<?>, Method> yangTypeGetters = new HashMap<>();
+
+    private static final Map<String, Method> builderSetters = new HashMap<>();
+
+    static {
+        try {
+            introspectDatastoreContextBuilder();
+            introspectDataStoreProperties();
+            introspectPrimitiveTypes();
+        } catch (IntrospectionException e) {
+            LOG.error("Error initializing DatastoreContextIntrospector", e);
+        }
+    }
+
+    /**
+     * Introspects each primitive wrapper (ie Integer, Long etc) and String type to find the
+     * constructor that takes a single String argument. For primitive wrappers, this constructor
+     * converts from a String representation.
+     */
+    private static void introspectPrimitiveTypes() {
+
+        Set<Class<?>> primitives = ImmutableSet.<Class<?>>builder().addAll(
+                Primitives.allWrapperTypes()).add(String.class).build();
+        for(Class<?> primitive: primitives) {
+            try {
+                processPropertyType(primitive);
+            } catch (Exception e) {
+                // Ignore primitives that can't be constructed from a String, eg Character and Void.
+            }
+        }
+    }
+
+    /**
+     * Introspects the DatastoreContext.Builder class to find all its setter methods that we will
+     * invoke via reflection. We can't use the bean Introspector here as the Builder setters don't
+     * follow the bean property naming convention, ie setter prefixed with "set", so look for all
+     * the methods that return Builder.
+     */
+    private static void introspectDatastoreContextBuilder() {
+        for(Method method: Builder.class.getMethods()) {
+            if(Builder.class.equals(method.getReturnType())) {
+                builderSetters.put(method.getName(), method);
+            }
+        }
+    }
+
+    /**
+     * Introspects the DataStoreProperties interface that is generated from the DataStoreProperties
+     * yang grouping. We use the bean Introspector to find the types of all the properties defined
+     * in the interface (this is the type returned from the getter method). For each type, we find
+     * the appropriate constructor that we will use.
+     */
+    private static void introspectDataStoreProperties() throws IntrospectionException {
+        BeanInfo beanInfo = Introspector.getBeanInfo(DataStoreProperties.class);
+        for(PropertyDescriptor desc: beanInfo.getPropertyDescriptors()) {
+            processDataStoreProperty(desc.getName(), desc.getPropertyType());
+        }
+
+        // Getter methods that return Boolean and start with "is" instead of "get" aren't recognized as
+        // properties and thus aren't returned from getPropertyDescriptors. A getter starting with
+        // "is" is only supported if it returns primitive boolean. So we'll check for these via
+        // getMethodDescriptors.
+        for(MethodDescriptor desc: beanInfo.getMethodDescriptors()) {
+            String methodName = desc.getName();
+            if(Boolean.class.equals(desc.getMethod().getReturnType()) && methodName.startsWith("is")) {
+                String propertyName = WordUtils.uncapitalize(methodName.substring(2));
+                processDataStoreProperty(propertyName, Boolean.class);
+            }
+        }
+    }
+
+    /**
+     * Processes a property defined on the DataStoreProperties interface.
+     */
+    private static void processDataStoreProperty(String name, Class<?> propertyType) {
+        Preconditions.checkArgument(builderSetters.containsKey(name), String.format(
+                "DataStoreProperties property \"%s\" does not have corresponding setter in DatastoreContext.Builder", name));
+        try {
+            processPropertyType(propertyType);
+            dataStorePropTypes.put(name, propertyType);
+        } catch (Exception e) {
+            LOG.error("Error finding constructor for type {}", propertyType, e);
+        }
+    }
+
+    /**
+     * Finds the appropriate constructor for the specified type that we will use to construct
+     * instances.
+     */
+    private static void processPropertyType(Class<?> propertyType) throws Exception {
+        Class<?> wrappedType = Primitives.wrap(propertyType);
+        if(constructors.containsKey(wrappedType)) {
+            return;
+        }
+
+        // If the type is a primitive (or String type), we look for the constructor that takes a
+        // single String argument, which, for primitives, validates and converts from a String
+        // representation which is the form we get on ingress.
+        if(propertyType.isPrimitive() || Primitives.isWrapperType(propertyType) ||
+                propertyType.equals(String.class))
+        {
+            constructors.put(wrappedType, propertyType.getConstructor(String.class));
+        } else {
+            // This must be a yang-defined type. We need to find the constructor that takes a
+            // primitive as the only argument. This will be used to construct instances to perform
+            // validation (eg range checking). The yang-generated types have a couple single-argument
+            // constructors but the one we want has the bean ConstructorProperties annotation.
+            for(Constructor<?> ctor: propertyType.getConstructors()) {
+                ConstructorProperties ctorPropsAnnotation = ctor.getAnnotation(ConstructorProperties.class);
+                if(ctor.getParameterTypes().length == 1 && ctorPropsAnnotation != null) {
+                    findYangTypeGetter(propertyType, ctorPropsAnnotation.value()[0]);
+                    constructors.put(propertyType, ctor);
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * Finds the getter method on a yang-generated type for the specified property name.
+     */
+    private static void findYangTypeGetter(Class<?> type, String propertyName)
+            throws Exception {
+        for(PropertyDescriptor desc: Introspector.getBeanInfo(type).getPropertyDescriptors()) {
+            if(desc.getName().equals(propertyName)) {
+                yangTypeGetters.put(type, desc.getReadMethod());
+                return;
+            }
+        }
+
+        throw new IllegalArgumentException(String.format(
+                "Getter method for constructor property %s not found for YANG type %s",
+                propertyName, type));
+    }
+
+    private DatastoreContext context;
+
+    public DatastoreContextIntrospector(DatastoreContext context) {
+        this.context = context;
+    }
+
+    public DatastoreContext getContext() {
+        return context;
+    }
+
+    /**
+     * Applies the given properties to the cached DatastoreContext and yields a new DatastoreContext
+     * instance which can be obtained via {@link getContext}.
+     *
+     * @param properties the properties to apply
+     * @return true if the cached DatastoreContext was updated, false otherwise.
+     */
+    public boolean update(Dictionary<String, Object> properties) {
+        if(properties == null || properties.isEmpty()) {
+            return false;
+        }
+
+        Builder builder = DatastoreContext.newBuilderFrom(context);
+
+        final String dataStoreTypePrefix = context.getDataStoreType() + '.';
+
+        // Sort the property keys by putting the names prefixed with the data store type last. This
+        // is done so data store specific settings are applied after global settings.
+        ArrayList<String> keys = Collections.list(properties.keys());
+        Collections.sort(keys, new Comparator<String>() {
+            @Override
+            public int compare(String key1, String key2) {
+                return key1.startsWith(dataStoreTypePrefix) ? 1 :
+                           key2.startsWith(dataStoreTypePrefix) ? -1 : key1.compareTo(key2);
+            }
+        });
+
+        boolean updated = false;
+        for(String key: keys) {
+            Object value = properties.get(key);
+            try {
+                // If the key is prefixed with the data store type, strip it off.
+                if(key.startsWith(dataStoreTypePrefix)) {
+                    key = key.replaceFirst(dataStoreTypePrefix, "");
+                }
+
+                key = convertToCamelCase(key);
+
+                // Convert the value to the right type.
+                value = convertValue(key, value);
+                if(value == null) {
+                    continue;
+                }
+
+                LOG.debug("Converted value for property {}: {} ({})",
+                        key, value, value.getClass().getSimpleName());
+
+                // Call the setter method on the Builder instance.
+                Method setter = builderSetters.get(key);
+                setter.invoke(builder, constructorValueRecursively(
+                        Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
+
+                updated = true;
+
+            } catch (Exception e) {
+                LOG.error("Error converting value ({}) for property {}", value, key, e);
+            }
+        }
+
+        if(updated) {
+            context = builder.build();
+        }
+
+        return updated;
+    }
+
+    private String convertToCamelCase(String inString) {
+        String str = inString.trim();
+        if(StringUtils.contains(str, '-') || StringUtils.contains(str, ' ')) {
+            str = inString.replace('-', ' ');
+            str = WordUtils.capitalizeFully(str);
+            str = StringUtils.deleteWhitespace(str);
+        }
+
+        return StringUtils.uncapitalize(str);
+    }
+
+    private Object convertValue(String name, Object from) throws Exception {
+        Class<?> propertyType = dataStorePropTypes.get(name);
+        if(propertyType == null) {
+            LOG.debug("Property not found for {}", name);
+            return null;
+        }
+
+        LOG.debug("Type for property {}: {}, converting value {} ({})",
+                name, propertyType.getSimpleName(), from, from.getClass().getSimpleName());
+
+        // Recurse the chain of constructors depth-first to get the resulting value. Eg, if the
+        // property type is the yang-generated NonZeroUint32Type, it's constructor takes a Long so
+        // we have to first construct a Long instance from the input value.
+        Object converted = constructorValueRecursively(propertyType, from.toString());
+
+        // If the converted type is a yang-generated type, call the getter to obtain the actual value.
+        Method getter = yangTypeGetters.get(converted.getClass());
+        if(getter != null) {
+            converted = getter.invoke(converted);
+        }
+
+        return converted;
+    }
+
+    private Object constructorValueRecursively(Class<?> toType, Object fromValue) throws Exception {
+        LOG.debug("convertValueRecursively - toType: {}, fromValue {} ({})",
+                toType.getSimpleName(), fromValue, fromValue.getClass().getSimpleName());
+
+        Constructor<?> ctor = constructors.get(toType);
+
+        LOG.debug("Found {}", ctor);
+
+        if(ctor == null) {
+            throw new IllegalArgumentException(String.format("Constructor not found for type %s", toType));
+        }
+
+        Object value = fromValue;
+
+        // Since the original input type is a String, once we find a constructor that takes a String
+        // argument, we're done recursing.
+        if(!ctor.getParameterTypes()[0].equals(String.class)) {
+            value = constructorValueRecursively(ctor.getParameterTypes()[0], fromValue);
+        }
+
+        return ctor.newInstance(value);
+    }
+}
index afbdbe1fe9859d15bbcf5d319112c87c5d054698..434efc9111f48e3910af12386d3cd1d4f5e7360a 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorSystem;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
@@ -40,6 +41,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     private final ActorContext actorContext;
 
+    private AutoCloseable closeable;
+
+    private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
+
     public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
             Configuration configuration, DatastoreContext datastoreContext) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
@@ -60,12 +65,20 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
                 ShardManager.props(cluster, configuration, datastoreContext)
                         .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ),
                 cluster, configuration, datastoreContext);
+
+        datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType());
+        datastoreConfigMXBean.setContext(datastoreContext);
+        datastoreConfigMXBean.registerMBean();
     }
 
     public DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
     }
 
+    public void setCloseable(AutoCloseable closeable) {
+        this.closeable = closeable;
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
@@ -115,7 +128,17 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
+        datastoreConfigMXBean.unregisterMBean();
+
+        if(closeable != null) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                LOG.debug("Error closing insance", e);
+            }
+        }
+
         actorContext.shutdown();
     }
 
index a9a735ede78de6e31e92fb2f638e6a793e1f2480..f3a55c55a720fe21e8718cc4a346c13706074f49 100644 (file)
@@ -23,16 +23,21 @@ public class DistributedDataStoreFactory {
     private static volatile ActorSystem persistentActorSystem = null;
 
     public static DistributedDataStore createInstance(SchemaService schemaService,
-                                                      DatastoreContext datastoreContext, BundleContext bundleContext) {
+            DatastoreContext datastoreContext, BundleContext bundleContext) {
+
+        DatastoreContextIntrospector introspector = new DatastoreContextIntrospector(datastoreContext);
+        DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                introspector, bundleContext);
 
         ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
-        final DistributedDataStore dataStore =
-                new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem),
-                        config, datastoreContext);
+        final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
+                new ClusterWrapperImpl(actorSystem), config, introspector.getContext());
 
         ShardStrategyFactory.setConfiguration(config);
         schemaService.registerSchemaContextListener(dataStore);
+
+        dataStore.setCloseable(overlay);
         return dataStore;
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java
new file mode 100644 (file)
index 0000000..65d82b7
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't
+ * support the BatchedModifications message.
+ *
+ * @author Thomas Pantelis
+ */
+class LegacyTransactionContextImpl extends TransactionContextImpl {
+
+    LegacyTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+            ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+            short remoteTransactionVersion, OperationCompleter operationCompleter) {
+        super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
+                remoteTransactionVersion,  operationCompleter);
+    }
+
+    @Override
+    public void deleteData(YangInstanceIdentifier path) {
+        recordedOperationFutures.add(executeOperationAsync(
+                new DeleteData(path, getRemoteTransactionVersion())));
+    }
+
+    @Override
+    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        recordedOperationFutures.add(executeOperationAsync(
+                new MergeData(path, data, getRemoteTransactionVersion())));
+    }
+
+    @Override
+    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        recordedOperationFutures.add(executeOperationAsync(
+                new WriteData(path, data, getRemoteTransactionVersion())));
+    }
+}
index 09fa61b570996919805aa24ff90d0975eb7c62c6..80aa3793c1a2daf140441152c80a03b2408b8dd6 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.Semaphore;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 
 final class OperationCompleter extends OnComplete<Object> {
     private final Semaphore operationLimiter;
@@ -19,7 +20,11 @@ final class OperationCompleter extends OnComplete<Object> {
     }
 
     @Override
-    public void onComplete(Throwable throwable, Object o){
-        this.operationLimiter.release();
+    public void onComplete(Throwable throwable, Object message) {
+        if(message instanceof BatchedModificationsReply) {
+            this.operationLimiter.release(((BatchedModificationsReply)message).getNumBatched());
+        } else {
+            this.operationLimiter.release();
+        }
     }
 }
\ No newline at end of file
index af25df13d2865ba867d6a529d3ad947101b1510a..613b3749e086abc8cdea38fd322872f656235a91 100644 (file)
@@ -129,9 +129,9 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         try {
             final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
             Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-            ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
+            ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
 
-            sender().tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): readDataReply), self());
+            sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
 
         } catch (Exception e) {
             LOG.debug(String.format("Unexpected error reading path %s", path), e);
index a4a2f45fdbdda87cc1166aa0e169214eea0df313..d5dcfde803a16bfcc6ea285473167d9b78e8c5cb 100644 (file)
@@ -13,6 +13,8 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
@@ -24,6 +26,7 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -37,7 +40,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  */
 public class ShardWriteTransaction extends ShardTransaction {
 
-    private final MutableCompositeModification modification = new MutableCompositeModification();
+    private final MutableCompositeModification compositeModification = new MutableCompositeModification();
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
@@ -55,18 +58,12 @@ public class ShardWriteTransaction extends ShardTransaction {
     @Override
     public void handleReceive(Object message) throws Exception {
 
-        if (message instanceof WriteData) {
-            writeData(transaction, (WriteData) message, !SERIALIZED_REPLY);
-
-        } else if (message instanceof MergeData) {
-            mergeData(transaction, (MergeData) message, !SERIALIZED_REPLY);
-
-        } else if (message instanceof DeleteData) {
-            deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
-
+        if (message instanceof BatchedModifications) {
+            batchedModifications((BatchedModifications)message);
         } else if (message instanceof ReadyTransaction) {
             readyTransaction(transaction, !SERIALIZED_REPLY);
-
+        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readyTransaction(transaction, SERIALIZED_REPLY);
         } else if(WriteData.isSerializedType(message)) {
             writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
 
@@ -76,22 +73,32 @@ public class ShardWriteTransaction extends ShardTransaction {
         } else if(DeleteData.isSerializedType(message)) {
             deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
 
-        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(transaction, SERIALIZED_REPLY);
-
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
-            getSender().tell(new GetCompositeModificationReply(modification), getSelf());
+            getSender().tell(new GetCompositeModificationReply(compositeModification), getSelf());
         } else {
             super.handleReceive(message);
         }
     }
 
+    private void batchedModifications(BatchedModifications batched) {
+        try {
+            for(Modification modification: batched.getModifications()) {
+                compositeModification.addModification(modification);
+                modification.apply(transaction);
+            }
+
+            getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+        } catch (Exception e) {
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
+    }
+
     private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
             boolean returnSerialized) {
         LOG.debug("writeData at path : {}", message.getPath());
 
-        modification.addModification(
+        compositeModification.addModification(
                 new WriteModification(message.getPath(), message.getData()));
         try {
             transaction.write(message.getPath(), message.getData());
@@ -107,7 +114,7 @@ public class ShardWriteTransaction extends ShardTransaction {
             boolean returnSerialized) {
         LOG.debug("mergeData at path : {}", message.getPath());
 
-        modification.addModification(
+        compositeModification.addModification(
                 new MergeModification(message.getPath(), message.getData()));
 
         try {
@@ -124,7 +131,7 @@ public class ShardWriteTransaction extends ShardTransaction {
             boolean returnSerialized) {
         LOG.debug("deleteData at path : {}", message.getPath());
 
-        modification.addModification(new DeleteModification(message.getPath()));
+        compositeModification.addModification(new DeleteModification(message.getPath()));
         try {
             transaction.delete(message.getPath());
             DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
@@ -143,7 +150,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
         getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
-                cohort, modification, returnSerialized), getContext());
+                cohort, compositeModification, returnSerialized), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
         getSelf().tell(PoisonPill.getInstance(), getSelf());
index 03d1b3a6d736541dca754e49fa6f35ea67b7ed2b..1e222e4c0a667307ce9b7cf3c24585b7f61d5911 100644 (file)
@@ -15,18 +15,19 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -36,7 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
-final class TransactionContextImpl extends AbstractTransactionContext {
+class TransactionContextImpl extends AbstractTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
     private final ActorContext actorContext;
@@ -44,8 +45,9 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     private final ActorSelection actor;
     private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
-    private final OperationCompleter operationCompleter;
 
+    private final OperationCompleter operationCompleter;
+    private BatchedModifications batchedModifications;
 
     TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
             ActorContext actorContext, SchemaContext schemaContext,
@@ -69,13 +71,12 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         return actor;
     }
 
-    private Future<Object> executeOperationAsync(SerializableMessage msg) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+    protected short getRemoteTransactionVersion() {
+        return remoteTransactionVersion;
     }
 
-    private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
-        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
-                msg.toSerializable(remoteTransactionVersion)));
+    protected Future<Object> executeOperationAsync(SerializableMessage msg) {
+        return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
     }
 
     @Override
@@ -90,6 +91,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
                 identifier, recordedOperationFutures.size());
 
+        // Send the remaining batched modifications if any.
+
+        sendBatchedModifications();
+
         // Send the ReadyTransaction message to the Tx actor.
 
         final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
@@ -155,25 +160,48 @@ final class TransactionContextImpl extends AbstractTransactionContext {
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
+    private void batchModification(Modification modification) {
+        if(batchedModifications == null) {
+            batchedModifications = new BatchedModifications(remoteTransactionVersion);
+        }
+
+        batchedModifications.addModification(modification);
+
+        if(batchedModifications.getModifications().size() >=
+                actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
+            sendBatchedModifications();
+        }
+    }
+
+    private void sendBatchedModifications() {
+        if(batchedModifications != null) {
+            LOG.debug("Tx {} sending {} batched modifications", identifier,
+                    batchedModifications.getModifications().size());
+
+            recordedOperationFutures.add(executeOperationAsync(batchedModifications));
+            batchedModifications = null;
+        }
+    }
+
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         LOG.debug("Tx {} deleteData called path = {}", identifier, path);
 
-        recordedOperationFutures.add(executeOperationAsync(new DeleteData(path)));
+        batchModification(new DeleteModification(path));
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} mergeData called path = {}", identifier, path);
 
-        recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
+        batchModification(new MergeModification(path, data));
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} writeData called path = {}", identifier, path);
 
-        recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
+        batchModification(new WriteModification(path, data));
     }
 
     @Override
@@ -182,6 +210,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         LOG.debug("Tx {} readData called path = {}", identifier, path);
 
+        // Send the remaining batched modifications if any.
+
+        sendBatchedModifications();
+
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail the read.
@@ -263,6 +295,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         LOG.debug("Tx {} dataExists called path = {}", identifier, path);
 
+        // Send the remaining batched modifications if any.
+
+        sendBatchedModifications();
+
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail this
index d63ec8010dc714adc71c951bf7fd0a65937a4ce7..58b37be2a2727babd9b0305e868d85d90c079052 100644 (file)
@@ -304,7 +304,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     private void throttleOperation(int acquirePermits) {
         try {
-            if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+            if(!operationLimiter.tryAcquire(acquirePermits,
+                    actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
             }
         } catch (InterruptedException e) {
@@ -689,7 +690,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
             String transactionPath = reply.getTransactionPath();
 
-            LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
+            LOG.debug("Tx {} Received {}", identifier, reply);
 
             ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
 
@@ -707,8 +708,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
-            return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
+                return new TransactionContextImpl(transactionPath, transactionActor, identifier,
+                    actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            } else {
+                return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier,
+                        actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            }
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java
new file mode 100644 (file)
index 0000000..6b81792
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
+
+
+/**
+ * MXBean interface for data store configuration.
+ *
+ * @author Thomas Pantelis
+ */
+public interface DatastoreConfigurationMXBean {
+    long getShardTransactionIdleTimeoutInSeconds();
+
+    long getOperationTimeoutInSeconds();
+
+    long getShardHeartbeatIntervalInMillis();
+
+    int getShardJournalRecoveryLogBatchSize();
+
+    long getShardIsolatedLeaderCheckIntervalInMillis();
+
+    long getShardElectionTimeoutFactor();
+
+    int getShardSnapshotDataThresholdPercentage();
+
+    long getShardSnapshotBatchCount();
+
+    long getShardTransactionCommitTimeoutInSeconds();
+
+    int getShardTransactionCommitQueueCapacity();
+
+    long getShardInitializationTimeoutInSeconds();
+
+    long getShardLeaderElectionTimeoutInSeconds();
+
+    boolean isPersistent();
+
+    long getTransactionCreationInitialRateLimit();
+
+    int getMaxShardDataChangeExecutorPoolSize();
+
+    int getMaxShardDataChangeExecutorQueueSize();
+
+    int getMaxShardDataChangeListenerQueueSize();
+
+    int getMaxShardDataStoreExecutorQueueSize();
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java
new file mode 100644 (file)
index 0000000..79ff2a4
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+
+/**
+ * Implementation of DatastoreConfigurationMXBean.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreConfigurationMXBeanImpl extends AbstractMXBean implements DatastoreConfigurationMXBean {
+    public static final String JMX_CATEGORY_CONFIGURATION = "Configuration";
+
+    private DatastoreContext context;
+
+    public DatastoreConfigurationMXBeanImpl(String mxBeanType) {
+        super("Datastore", mxBeanType, JMX_CATEGORY_CONFIGURATION);
+    }
+
+    public void setContext(DatastoreContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public long getShardTransactionIdleTimeoutInSeconds() {
+        return context.getShardTransactionIdleTimeout().toSeconds();
+    }
+
+    @Override
+    public long getOperationTimeoutInSeconds() {
+        return context.getOperationTimeoutInSeconds();
+    }
+
+    @Override
+    public long getShardHeartbeatIntervalInMillis() {
+        return context.getShardRaftConfig().getHeartBeatInterval().toMillis();
+    }
+
+    @Override
+    public int getShardJournalRecoveryLogBatchSize() {
+        return context.getShardRaftConfig().getJournalRecoveryLogBatchSize();
+    }
+
+    @Override
+    public long getShardIsolatedLeaderCheckIntervalInMillis() {
+        return context.getShardRaftConfig().getIsolatedCheckIntervalInMillis();
+    }
+
+    @Override
+    public long getShardElectionTimeoutFactor() {
+        return context.getShardRaftConfig().getElectionTimeoutFactor();
+    }
+
+    @Override
+    public int getShardSnapshotDataThresholdPercentage() {
+        return context.getShardRaftConfig().getSnapshotDataThresholdPercentage();
+    }
+
+    @Override
+    public long getShardSnapshotBatchCount() {
+        return context.getShardRaftConfig().getSnapshotBatchCount();
+    }
+
+    @Override
+    public long getShardTransactionCommitTimeoutInSeconds() {
+        return context.getShardTransactionCommitTimeoutInSeconds();
+    }
+
+    @Override
+    public int getShardTransactionCommitQueueCapacity() {
+        return context.getShardTransactionCommitQueueCapacity();
+    }
+
+    @Override
+    public long getShardInitializationTimeoutInSeconds() {
+        return context.getShardInitializationTimeout().duration().toSeconds();
+    }
+
+    @Override
+    public long getShardLeaderElectionTimeoutInSeconds() {
+        return context.getShardLeaderElectionTimeout().duration().toSeconds();
+    }
+
+    @Override
+    public boolean isPersistent() {
+        return context.isPersistent();
+    }
+
+    @Override
+    public long getTransactionCreationInitialRateLimit() {
+        return context.getTransactionCreationInitialRateLimit();
+    }
+
+    @Override
+    public int getMaxShardDataChangeExecutorPoolSize() {
+        return context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
+    }
+
+    @Override
+    public int getMaxShardDataChangeExecutorQueueSize() {
+        return context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize();
+    }
+
+    @Override
+    public int getMaxShardDataChangeListenerQueueSize() {
+        return context.getDataStoreProperties().getMaxDataChangeListenerQueueSize();
+    }
+
+    @Override
+    public int getMaxShardDataStoreExecutorQueueSize() {
+        return context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
new file mode 100644 (file)
index 0000000..670641f
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+
+/**
+ * Message used to batch write, merge, delete modification operations to the  ShardTransaction actor.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModifications extends MutableCompositeModification implements SerializableMessage {
+    private static final long serialVersionUID = 1L;
+
+    public BatchedModifications() {
+    }
+
+    public BatchedModifications(short version) {
+        super(version);
+    }
+
+    @Override
+    public Object toSerializable() {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java
new file mode 100644 (file)
index 0000000..33c5733
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * The reply for the BatchedModifications message.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModificationsReply extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
+
+    private int numBatched;
+
+    public BatchedModificationsReply() {
+    }
+
+    public BatchedModificationsReply(int numBatched) {
+        this.numBatched = numBatched;
+    }
+
+
+    public int getNumBatched() {
+        return numBatched;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        numBatched = in.readInt();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeInt(numBatched);
+    }
+
+    @Override
+    public Object toSerializable() {
+        return this;
+    }
+}
index ffd0f1ccf3cfcc79598aad6d03f171fb2c70de61..c2bf81fa8e75fd0b5da2a700d37ab142e4815042 100644 (file)
@@ -57,4 +57,11 @@ public class CreateTransactionReply implements SerializableMessage {
                 (short)o.getMessageVersion());
     }
 
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("CreateTransactionReply [transactionPath=").append(transactionPath).append(", transactionId=")
+                .append(transactionId).append(", version=").append(version).append("]");
+        return builder.toString();
+    }
 }
index 04bc63c5a5e73506dc9835ae4abf36915372bcee..5ba787c98322eb155ce5971a91807a5c11d3dc66 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -18,18 +17,22 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class DeleteData implements VersionedSerializableMessage, Externalizable {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class DeleteData extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
     public static final Class<DeleteData> SERIALIZABLE_CLASS = DeleteData.class;
 
     private YangInstanceIdentifier path;
-    private short version;
 
     public DeleteData() {
     }
 
-    public DeleteData(final YangInstanceIdentifier path) {
+    public DeleteData(final YangInstanceIdentifier path, short version) {
+        super(version);
         this.path = path;
     }
 
@@ -37,26 +40,21 @@ public class DeleteData implements VersionedSerializableMessage, Externalizable
         return path;
     }
 
-    public short getVersion() {
-        return version;
-    }
-
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        version = in.readShort(); // Read the version - don't need to do anything with it now
+        super.readExternal(in);
         path = SerializationUtils.deserializePath(in);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(version);
+        super.writeExternal(out);
         SerializationUtils.serializePath(path, out);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            version = toVersion;
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             // To base or R1 Helium version
@@ -71,7 +69,8 @@ public class DeleteData implements VersionedSerializableMessage, Externalizable
         } else {
             // From base or R1 Helium version
             ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
-            return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+            return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
index 0c6ff0e68d69d05e56871d3e197795387310232d..dd21b0e2e64071576b141c8062761bfa9491ef6d 100644 (file)
@@ -10,7 +10,12 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
 public class DeleteDataReply extends EmptyReply {
+    private static final long serialVersionUID = 1L;
 
     private static final Object LEGACY_SERIALIZED_INSTANCE =
             ShardTransactionMessages.DeleteDataReply.newBuilder().build();
index 284c6eff8d3488037fd63ca76b4326228bb4e3c4..38a37f0ccfedbe32f162db088533cee388b912fb 100644 (file)
@@ -14,7 +14,7 @@ import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
  *
  * @author Thomas Pantelis
  */
-public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage {
+public abstract class EmptyReply extends EmptyExternalizable {
 
     private final Object legacySerializedInstance;
 
@@ -23,7 +23,6 @@ public abstract class EmptyReply extends EmptyExternalizable implements Versione
         this.legacySerializedInstance = legacySerializedInstance;
     }
 
-    @Override
     public Object toSerializable(short toVersion) {
         return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance;
     }
index ae0d630cf267aa75b0ef403b8df7e7733d06e082..0f44733503ead5c1eb2a2e84bab07c284978fd3e 100644 (file)
@@ -16,7 +16,11 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class MergeData extends ModifyData implements VersionedSerializableMessage {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class MergeData extends ModifyData {
     private static final long serialVersionUID = 1L;
 
     public static final Class<MergeData> SERIALIZABLE_CLASS = MergeData.class;
@@ -24,14 +28,13 @@ public class MergeData extends ModifyData implements VersionedSerializableMessag
     public MergeData() {
     }
 
-    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        super(path, data);
+    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+        super(path, data, version);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            setVersion(toVersion);
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             // To base or R1 Helium version
@@ -50,7 +53,8 @@ public class MergeData extends ModifyData implements VersionedSerializableMessag
             ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
             Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
                     o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-            return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode());
+            return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
index a4c514bdbf0751b7399ba5928c367089a8210e82..6936ef14c52e6dbac89e9379d5d8230bf51a1d59 100644 (file)
@@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
 public class MergeDataReply extends EmptyReply {
     private static final long serialVersionUID = 1L;
 
index 69c41c2a5663f5021a7a22401aef48ca9b3986d2..bbd090f9291ccde682b33a46e748e6f618352754 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -17,17 +16,21 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Ap
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public abstract class ModifyData implements Externalizable {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public abstract class ModifyData extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
     private YangInstanceIdentifier path;
     private NormalizedNode<?, ?> data;
-    private short version;
 
     protected ModifyData() {
     }
 
-    protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+    protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+        super(version);
         this.path = path;
         this.data = data;
     }
@@ -40,23 +43,15 @@ public abstract class ModifyData implements Externalizable {
         return data;
     }
 
-    public short getVersion() {
-        return version;
-    }
-
-    protected void setVersion(short version) {
-        this.version = version;
-    }
-
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        version = in.readShort();
+        super.readExternal(in);
         SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(version);
+        super.writeExternal(out);
         SerializationUtils.serializePathAndNode(path, data, out);
     }
 
index 8ac6e1b1494a68fd3e7f2f04e0081a4d7f538a10..b0c163d87f346ccaefc300ce38f88573ab033b17 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.protobuf.ByteString;
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -19,18 +18,18 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class ReadDataReply implements VersionedSerializableMessage, Externalizable {
+public class ReadDataReply extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
     public static final Class<ReadDataReply> SERIALIZABLE_CLASS = ReadDataReply.class;
 
     private NormalizedNode<?, ?> normalizedNode;
-    private short version;
 
     public ReadDataReply() {
     }
 
-    public ReadDataReply(NormalizedNode<?, ?> normalizedNode) {
+    public ReadDataReply(NormalizedNode<?, ?> normalizedNode, short version) {
+        super(version);
         this.normalizedNode = normalizedNode;
     }
 
@@ -40,20 +39,19 @@ public class ReadDataReply implements VersionedSerializableMessage, Externalizab
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        version = in.readShort();
+        super.readExternal(in);
         normalizedNode = SerializationUtils.deserializeNormalizedNode(in);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(version);
+        super.writeExternal(out);
         SerializationUtils.serializeNormalizedNode(normalizedNode, out);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            version = toVersion;
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             return toSerializableReadDataReply(normalizedNode);
@@ -78,7 +76,8 @@ public class ReadDataReply implements VersionedSerializableMessage, Externalizab
         } else {
             ShardTransactionMessages.ReadDataReply o =
                     (ShardTransactionMessages.ReadDataReply) serializable;
-            return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()));
+            return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java
new file mode 100644 (file)
index 0000000..2a660fa
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Abstract base class for a versioned Externalizable message.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage {
+    private static final long serialVersionUID = 1L;
+
+    private short version;
+
+    public VersionedExternalizableMessage() {
+    }
+
+    public VersionedExternalizableMessage(short version) {
+        this.version = version;
+    }
+
+    public short getVersion() {
+        return version;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        version = in.readShort();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(version);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java
deleted file mode 100644 (file)
index 5c30b10..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-/**
- * Interface for a Serializable message with versioning.
- *
- * @author Thomas Pantelis
- */
-public interface VersionedSerializableMessage {
-    Object toSerializable(short toVersion);
-}
index 989949c88fb0f0c5400ea903119435a6d4ed0940..a4f648b6b3ccb1a99fe1f3c66241f6801c4290bc 100644 (file)
@@ -16,7 +16,11 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class WriteData extends ModifyData implements VersionedSerializableMessage {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class WriteData extends ModifyData {
     private static final long serialVersionUID = 1L;
 
     public static final Class<WriteData> SERIALIZABLE_CLASS = WriteData.class;
@@ -24,14 +28,13 @@ public class WriteData extends ModifyData implements VersionedSerializableMessag
     public WriteData() {
     }
 
-    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        super(path, data);
+    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+        super(path, data, version);
     }
 
     @Override
-    public Object toSerializable(short toVersion) {
-        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
-            setVersion(toVersion);
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
             return this;
         } else {
             // To base or R1 Helium version
@@ -50,7 +53,8 @@ public class WriteData extends ModifyData implements VersionedSerializableMessag
             ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
             Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
                     o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-            return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode());
+            return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(),
+                    DataStoreVersions.HELIUM_2_VERSION);
         }
     }
 
index 8255828819cd494a93fc61dacd32bc48ba55edba..3455571a518f1bf656369a23f2ea202ee53d6271 100644 (file)
@@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
 public class WriteDataReply extends EmptyReply {
     private static final long serialVersionUID = 1L;
 
index f04d00440405deab7e5a8be141fd624fd3c30f35..77f0858d7b1ef9119fb5b2bff9039de0d30e4536 100644 (file)
@@ -17,8 +17,10 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public abstract class AbstractModification implements Modification {
 
     private YangInstanceIdentifier path;
+    private short version;
 
-    protected AbstractModification() {
+    protected AbstractModification(short version) {
+        this.version = version;
     }
 
     protected AbstractModification(YangInstanceIdentifier path) {
@@ -32,4 +34,8 @@ public abstract class AbstractModification implements Modification {
     public YangInstanceIdentifier getPath() {
         return path;
     }
+
+    public short getVersion() {
+        return version;
+    }
 }
index 833f86fb981f1179ce326c4d3703f17c3449aa73..3a63f5b17361a0015ac43ba293bcd54ee29339cb 100644 (file)
@@ -25,6 +25,11 @@ public class DeleteModification extends AbstractModification {
     private static final long serialVersionUID = 1L;
 
     public DeleteModification() {
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public DeleteModification(short version) {
+        super(version);
     }
 
     public DeleteModification(YangInstanceIdentifier path) {
@@ -43,13 +48,11 @@ public class DeleteModification extends AbstractModification {
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        in.readShort();
         setPath(SerializationUtils.deserializePath(in));
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(DataStoreVersions.CURRENT_VERSION);
         SerializationUtils.serializePath(getPath(), out);
     }
 
@@ -66,8 +69,9 @@ public class DeleteModification extends AbstractModification {
         return new DeleteModification(InstanceIdentifierUtils.fromSerializable(o.getPath()));
     }
 
-    public static DeleteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
-        DeleteModification mod = new DeleteModification();
+    public static DeleteModification fromStream(ObjectInput in, short version)
+            throws ClassNotFoundException, IOException {
+        DeleteModification mod = new DeleteModification(version);
         mod.readExternal(in);
         return mod;
     }
index 571443eedd3a89f2ce9d474cbfbd883919a8d7ff..7ba74f4e7ff63b42e82d81ab6e82f7f22b1ede6f 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.modification;
 
 import java.io.IOException;
 import java.io.ObjectInput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
@@ -24,6 +25,11 @@ public class MergeModification extends WriteModification {
     private static final long serialVersionUID = 1L;
 
     public MergeModification() {
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public MergeModification(short version) {
+        super(version);
     }
 
     public MergeModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
@@ -47,8 +53,9 @@ public class MergeModification extends WriteModification {
         return new MergeModification(decoded.getDecodedPath(), decoded.getDecodedNode());
     }
 
-    public static MergeModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
-        MergeModification mod = new MergeModification();
+    public static MergeModification fromStream(ObjectInput in, short version)
+            throws ClassNotFoundException, IOException {
+        MergeModification mod = new MergeModification(version);
         mod.readExternal(in);
         return mod;
     }
index 5d7947b19fc6ddaeafe133ec546dfc879dd07855..b597742319f08a2c04a8b633baf6d525e97dff14 100644 (file)
@@ -27,10 +27,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 public class MutableCompositeModification implements CompositeModification {
     private static final long serialVersionUID = 1L;
 
-    private final List<Modification> modifications;
+    private final List<Modification> modifications = new ArrayList<>();
+    private short version;
 
     public MutableCompositeModification() {
-        modifications = new ArrayList<>();
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public MutableCompositeModification(short version) {
+        this.version = version;
     }
 
     @Override
@@ -45,6 +50,14 @@ public class MutableCompositeModification implements CompositeModification {
         return COMPOSITE;
     }
 
+    public short getVersion() {
+        return version;
+    }
+
+    public void setVersion(short version) {
+        this.version = version;
+    }
+
     /**
      * Add a new Modification to the list of Modifications represented by this
      * composite
@@ -62,7 +75,7 @@ public class MutableCompositeModification implements CompositeModification {
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        in.readShort();
+        version = in.readShort();
 
         int size = in.readInt();
 
@@ -75,15 +88,15 @@ public class MutableCompositeModification implements CompositeModification {
                 byte type = in.readByte();
                 switch(type) {
                 case Modification.WRITE:
-                    modifications.add(WriteModification.fromStream(in));
+                    modifications.add(WriteModification.fromStream(in, version));
                     break;
 
                 case Modification.MERGE:
-                    modifications.add(MergeModification.fromStream(in));
+                    modifications.add(MergeModification.fromStream(in, version));
                     break;
 
                 case Modification.DELETE:
-                    modifications.add(DeleteModification.fromStream(in));
+                    modifications.add(DeleteModification.fromStream(in, version));
                     break;
                 }
             }
@@ -94,7 +107,7 @@ public class MutableCompositeModification implements CompositeModification {
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(DataStoreVersions.CURRENT_VERSION);
+        out.writeShort(version);
 
         out.writeInt(modifications.size());
 
@@ -121,8 +134,7 @@ public class MutableCompositeModification implements CompositeModification {
         builder.setTimeStamp(System.nanoTime());
 
         for (Modification m : modifications) {
-            builder.addModification(
-                    (PersistentMessages.Modification) m.toSerializable());
+            builder.addModification((PersistentMessages.Modification) m.toSerializable());
         }
 
         return builder.build();
index 9c122c9adeef8a14cf05bfa877d38a5cbe310ae2..2fdca5f3792161400bf5e8cbbb0f4e13222bcad5 100644 (file)
@@ -31,6 +31,11 @@ public class WriteModification extends AbstractModification {
     private NormalizedNode<?, ?> data;
 
     public WriteModification() {
+        this(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public WriteModification(short version) {
+        super(version);
     }
 
     public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
@@ -54,14 +59,11 @@ public class WriteModification extends AbstractModification {
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        in.readShort(); // version
-
         SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeShort(DataStoreVersions.CURRENT_VERSION);
         SerializationUtils.serializePathAndNode(getPath(), data, out);
     }
 
@@ -81,8 +83,9 @@ public class WriteModification extends AbstractModification {
         return new WriteModification(decoded.getDecodedPath(), decoded.getDecodedNode());
     }
 
-    public static WriteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
-        WriteModification mod = new WriteModification();
+    public static WriteModification fromStream(ObjectInput in, short version)
+            throws ClassNotFoundException, IOException {
+        WriteModification mod = new WriteModification(version);
         mod.readExternal(in);
         return mod;
     }
index 7e8307465b9818d3c886098219065c9276b77e65..f78b134d42a5b6cdda965bf090ea18c64bfdea69 100644 (file)
@@ -1,14 +1,9 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
-import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.osgi.framework.BundleContext;
 
-import scala.concurrent.duration.Duration;
-
 public class DistributedConfigDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
     private BundleContext bundleContext;
@@ -42,23 +37,19 @@ public class DistributedConfigDataStoreProviderModule extends
 
         DatastoreContext datastoreContext = DatastoreContext.newBuilder()
                 .dataStoreType("config")
-                .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
-                        props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
-                        props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
-                        props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
-                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()))
-                .shardTransactionIdleTimeout(Duration.create(
-                        props.getShardTransactionIdleTimeoutInMinutes().getValue(), TimeUnit.MINUTES))
+                .maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue())
+                .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue())
+                .maxShardDataChangeListenerQueueSize(props.getMaxShardDataChangeListenerQueueSize().getValue().intValue())
+                .maxShardDataStoreExecutorQueueSize(props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue())
+                .shardTransactionIdleTimeoutInMinutes(props.getShardTransactionIdleTimeoutInMinutes().getValue())
                 .operationTimeoutInSeconds(props.getOperationTimeoutInSeconds().getValue())
                 .shardJournalRecoveryLogBatchSize(props.getShardJournalRecoveryLogBatchSize().
                         getValue().intValue())
                 .shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue())
                 .shardSnapshotDataThresholdPercentage(props.getShardSnapshotDataThresholdPercentage().getValue().intValue())
-                .shardHeartbeatIntervalInMillis(props.getShardHearbeatIntervalInMillis().getValue())
-                .shardInitializationTimeout(props.getShardInitializationTimeoutInSeconds().getValue(),
-                        TimeUnit.SECONDS)
-                .shardLeaderElectionTimeout(props.getShardLeaderElectionTimeoutInSeconds().getValue(),
-                        TimeUnit.SECONDS)
+                .shardHeartbeatIntervalInMillis(props.getShardHeartbeatIntervalInMillis().getValue())
+                .shardInitializationTimeoutInSeconds(props.getShardInitializationTimeoutInSeconds().getValue())
+                .shardLeaderElectionTimeoutInSeconds(props.getShardLeaderElectionTimeoutInSeconds().getValue())
                 .shardTransactionCommitTimeoutInSeconds(
                         props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
                 .shardTransactionCommitQueueCapacity(
@@ -67,7 +58,8 @@ public class DistributedConfigDataStoreProviderModule extends
                 .shardIsolatedLeaderCheckIntervalInMillis(
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
-                .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
+                .transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue())
+                .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
index 0655468531a16fe7a9ad2032a5f56d6f948044e3..6711a1007475d8a0cbb18ecb204a35e39491513a 100644 (file)
@@ -1,14 +1,9 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
-import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.osgi.framework.BundleContext;
 
-import scala.concurrent.duration.Duration;
-
 public class DistributedOperationalDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
     private BundleContext bundleContext;
@@ -42,23 +37,19 @@ public class DistributedOperationalDataStoreProviderModule extends
 
         DatastoreContext datastoreContext = DatastoreContext.newBuilder()
                 .dataStoreType("operational")
-                .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
-                        props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
-                        props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
-                        props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
-                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()))
-                .shardTransactionIdleTimeout(Duration.create(
-                        props.getShardTransactionIdleTimeoutInMinutes().getValue(), TimeUnit.MINUTES))
+                .maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue())
+                .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue())
+                .maxShardDataChangeListenerQueueSize(props.getMaxShardDataChangeListenerQueueSize().getValue().intValue())
+                .maxShardDataStoreExecutorQueueSize(props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue())
+                .shardTransactionIdleTimeoutInMinutes(props.getShardTransactionIdleTimeoutInMinutes().getValue())
                 .operationTimeoutInSeconds(props.getOperationTimeoutInSeconds().getValue())
                 .shardJournalRecoveryLogBatchSize(props.getShardJournalRecoveryLogBatchSize().
                         getValue().intValue())
                 .shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue())
                 .shardSnapshotDataThresholdPercentage(props.getShardSnapshotDataThresholdPercentage().getValue().intValue())
-                .shardHeartbeatIntervalInMillis(props.getShardHearbeatIntervalInMillis().getValue())
-                .shardInitializationTimeout(props.getShardInitializationTimeoutInSeconds().getValue(),
-                        TimeUnit.SECONDS)
-                .shardLeaderElectionTimeout(props.getShardLeaderElectionTimeoutInSeconds().getValue(),
-                        TimeUnit.SECONDS)
+                .shardHeartbeatIntervalInMillis(props.getShardHeartbeatIntervalInMillis().getValue())
+                .shardInitializationTimeoutInSeconds(props.getShardInitializationTimeoutInSeconds().getValue())
+                .shardLeaderElectionTimeoutInSeconds(props.getShardLeaderElectionTimeoutInSeconds().getValue())
                 .shardTransactionCommitTimeoutInSeconds(
                         props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
                 .shardTransactionCommitQueueCapacity(
@@ -67,7 +58,8 @@ public class DistributedOperationalDataStoreProviderModule extends
                 .shardIsolatedLeaderCheckIntervalInMillis(
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
-                .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
+                .transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue())
+                .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
index e2ee7373d0cfd3c4a68ec98a221fd1a121f4f13c..b775cf0a9914be9ed37f8595bbafeb662e57321e 100644 (file)
@@ -104,7 +104,7 @@ module distributed-datastore-provider {
          }
 
 
-         leaf shard-hearbeat-interval-in-millis {
+         leaf shard-heartbeat-interval-in-millis {
             default 500;
             type heartbeat-interval-type;
             description "The interval at which a shard will send a heart beat message to its remote shard.";
@@ -156,6 +156,15 @@ module distributed-datastore-provider {
                           an operation (eg transaction create).";
          }
 
+         leaf shard-batched-modification-count {
+            default 100;
+            type non-zero-uint32-type;
+            description "The number of transaction modification operations (put, merge, delete) to
+                        batch before sending to the shard transaction actor. Batching improves
+                        performance as less modifications messages are sent to the actor and thus
+                        lessens the chance that the transaction actor's mailbox queue could get full.";
+         }
+
          leaf enable-metric-capture {
             default false;
             type boolean;
@@ -181,7 +190,7 @@ module distributed-datastore-provider {
                         followers are active and term itself as isolated";
         }
 
-        leaf tx-creation-initial-rate-limit {
+        leaf transaction-creation-initial-rate-limit {
             default 100;
             type non-zero-uint32-type;
             description "The initial number of transactions per second that are allowed before the data store
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextConfigAdminOverlayTest.java
new file mode 100644 (file)
index 0000000..3693c01
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+
+/**
+ * Unit tests for DatastoreContextConfigAdminOverlay.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreContextConfigAdminOverlayTest {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void test() throws IOException {
+        BundleContext mockBundleContext = mock(BundleContext.class);
+        ServiceReference<ConfigurationAdmin> mockServiceRef = mock(ServiceReference.class);
+        ConfigurationAdmin mockConfigAdmin = mock(ConfigurationAdmin.class);
+        Configuration mockConfig = mock(Configuration.class);
+        DatastoreContextIntrospector mockIntrospector = mock(DatastoreContextIntrospector.class);
+
+        doReturn(mockServiceRef).when(mockBundleContext).getServiceReference(ConfigurationAdmin.class);
+        doReturn(mockConfigAdmin).when(mockBundleContext).getService(mockServiceRef);
+
+        doReturn(mockConfig).when(mockConfigAdmin).getConfiguration(DatastoreContextConfigAdminOverlay.CONFIG_ID);
+
+        doReturn(DatastoreContextConfigAdminOverlay.CONFIG_ID).when(mockConfig).getPid();
+
+        Dictionary<String, Object> properties = new Hashtable<>();
+        properties.put("property", "value");
+        doReturn(properties).when(mockConfig).getProperties();
+
+        try(DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
+                mockIntrospector, mockBundleContext)) {
+        }
+
+        verify(mockIntrospector).update(properties);
+
+        verify(mockBundleContext).ungetService(mockServiceRef);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java
new file mode 100644 (file)
index 0000000..745c9b3
--- /dev/null
@@ -0,0 +1,188 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+
+/**
+ * Unit tests for DatastoreContextIntrospector.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreContextIntrospectorTest {
+
+    @Test
+    public void testUpdate() {
+        DatastoreContext context = DatastoreContext.newBuilder().dataStoreType("operational").build();
+        DatastoreContextIntrospector introspector = new DatastoreContextIntrospector(context );
+
+        Dictionary<String, Object> properties = new Hashtable<>();
+        properties.put("shard-transaction-idle-timeout-in-minutes", "31");
+        properties.put("operation-timeout-in-seconds", "26");
+        properties.put("shard-transaction-commit-timeout-in-seconds", "100");
+        properties.put("shard-journal-recovery-log-batch-size", "199");
+        properties.put("shard-snapshot-batch-count", "212");
+        properties.put("shard-heartbeat-interval-in-millis", "101");
+        properties.put("shard-transaction-commit-queue-capacity", "567");
+        properties.put("shard-initialization-timeout-in-seconds", "82");
+        properties.put("shard-leader-election-timeout-in-seconds", "66");
+        properties.put("shard-isolated-leader-check-interval-in-millis", "123");
+        properties.put("shard-snapshot-data-threshold-percentage", "100");
+        properties.put("shard-election-timeout-factor", "21");
+        properties.put("shard-batched-modification-count", "901");
+        properties.put("transactionCreationInitialRateLimit", "200");
+        properties.put("MaxShardDataChangeExecutorPoolSize", "41");
+        properties.put("Max-Shard-Data-Change Executor-Queue Size", "1111");
+        properties.put(" max shard data change listener queue size", "2222");
+        properties.put("mAx-shaRd-data-STORE-executor-quEUe-size", "3333");
+        properties.put("persistent", "false");
+
+        boolean updated = introspector.update(properties);
+        assertEquals("updated", true, updated);
+        context = introspector.getContext();
+
+        assertEquals(31, context.getShardTransactionIdleTimeout().toMinutes());
+        assertEquals(26, context.getOperationTimeoutInSeconds());
+        assertEquals(100, context.getShardTransactionCommitTimeoutInSeconds());
+        assertEquals(199, context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+        assertEquals(212, context.getShardRaftConfig().getSnapshotBatchCount());
+        assertEquals(101, context.getShardRaftConfig().getHeartBeatInterval().length());
+        assertEquals(567, context.getShardTransactionCommitQueueCapacity());
+        assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
+        assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
+        assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
+        assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+        assertEquals(21, context.getShardRaftConfig().getElectionTimeoutFactor());
+        assertEquals(901, context.getShardBatchedModificationCount());
+        assertEquals(200, context.getTransactionCreationInitialRateLimit());
+        assertEquals(41, context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+        assertEquals(1111, context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize());
+        assertEquals(2222, context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
+        assertEquals(3333, context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
+        assertEquals(false, context.isPersistent());
+
+        properties.put("shard-transaction-idle-timeout-in-minutes", "32");
+        properties.put("operation-timeout-in-seconds", "27");
+        properties.put("shard-heartbeat-interval-in-millis", "102");
+        properties.put("shard-election-timeout-factor", "22");
+        properties.put("max-shard-data-change-executor-pool-size", "42");
+        properties.put("max-shard-data-store-executor-queue-size", "4444");
+        properties.put("persistent", "true");
+
+        updated = introspector.update(properties);
+        assertEquals("updated", true, updated);
+        context = introspector.getContext();
+
+        assertEquals(32, context.getShardTransactionIdleTimeout().toMinutes());
+        assertEquals(27, context.getOperationTimeoutInSeconds());
+        assertEquals(100, context.getShardTransactionCommitTimeoutInSeconds());
+        assertEquals(199, context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+        assertEquals(212, context.getShardRaftConfig().getSnapshotBatchCount());
+        assertEquals(102, context.getShardRaftConfig().getHeartBeatInterval().length());
+        assertEquals(567, context.getShardTransactionCommitQueueCapacity());
+        assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
+        assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
+        assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
+        assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+        assertEquals(22, context.getShardRaftConfig().getElectionTimeoutFactor());
+        assertEquals(200, context.getTransactionCreationInitialRateLimit());
+        assertEquals(42, context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+        assertEquals(1111, context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize());
+        assertEquals(2222, context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
+        assertEquals(4444, context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
+        assertEquals(true, context.isPersistent());
+
+        updated = introspector.update(null);
+        assertEquals("updated", false, updated);
+
+        updated = introspector.update(new Hashtable<String, Object>());
+        assertEquals("updated", false, updated);
+    }
+
+
+    @Test
+    public void testUpdateWithInvalidValues() {
+        DatastoreContext context = DatastoreContext.newBuilder().dataStoreType("operational").build();
+        DatastoreContextIntrospector introspector = new DatastoreContextIntrospector(context );
+
+        Dictionary<String, Object> properties = new Hashtable<>();
+        properties.put("shard-transaction-idle-timeout-in-minutes", "0"); // bad - must be > 0
+        properties.put("shard-journal-recovery-log-batch-size", "199");
+        properties.put("shard-transaction-commit-timeout-in-seconds", "bogus"); // bad - NaN
+        properties.put("shard-snapshot-batch-count", "212"); // good
+        properties.put("operation-timeout-in-seconds", "4"); // bad - must be >= 5
+        properties.put("shard-heartbeat-interval-in-millis", "99"); // bad - must be >= 100
+        properties.put("shard-transaction-commit-queue-capacity", "567"); // good
+        properties.put("shard-snapshot-data-threshold-percentage", "101"); // bad - must be 0-100
+        properties.put("shard-initialization-timeout-in-seconds", "-1"); // bad - must be > 0
+        properties.put("max-shard-data-change-executor-pool-size", "bogus"); // bad - NaN
+        properties.put("unknownProperty", "1"); // bad - invalid property name
+
+        boolean updated = introspector.update(properties);
+        assertEquals("updated", true, updated);
+        context = introspector.getContext();
+
+        assertEquals(DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT, context.getShardTransactionIdleTimeout());
+        assertEquals(199, context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+        assertEquals(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, context.getShardTransactionCommitTimeoutInSeconds());
+        assertEquals(212, context.getShardRaftConfig().getSnapshotBatchCount());
+        assertEquals(DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, context.getOperationTimeoutInSeconds());
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, context.getShardRaftConfig().getHeartBeatInterval().length());
+        assertEquals(567, context.getShardTransactionCommitQueueCapacity());
+        assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE,
+                context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+        assertEquals(DEFAULT_SHARD_INITIALIZATION_TIMEOUT, context.getShardInitializationTimeout());
+        assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE,
+                context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+    }
+
+    @Test
+    public void testUpdateWithDatastoreTypeSpecificProperties() {
+        Dictionary<String, Object> properties = new Hashtable<>();
+        properties.put("shard-transaction-idle-timeout-in-minutes", "22"); // global setting
+        properties.put("operational.shard-transaction-idle-timeout-in-minutes", "33"); // operational override
+        properties.put("config.shard-transaction-idle-timeout-in-minutes", "44"); // config override
+
+        properties.put("max-shard-data-change-executor-pool-size", "222"); // global setting
+        properties.put("operational.max-shard-data-change-executor-pool-size", "333"); // operational override
+        properties.put("config.max-shard-data-change-executor-pool-size", "444"); // config override
+
+        properties.put("persistent", "false"); // global setting
+        properties.put("operational.Persistent", "true"); // operational override
+
+        DatastoreContext operContext = DatastoreContext.newBuilder().dataStoreType("operational").build();
+        DatastoreContextIntrospector operIntrospector = new DatastoreContextIntrospector(operContext);
+        boolean updated = operIntrospector.update(properties);
+        assertEquals("updated", true, updated);
+        operContext = operIntrospector.getContext();
+
+        assertEquals(33, operContext.getShardTransactionIdleTimeout().toMinutes());
+        assertEquals(true, operContext.isPersistent());
+        assertEquals(333, operContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+
+        DatastoreContext configContext = DatastoreContext.newBuilder().dataStoreType("config").build();
+        DatastoreContextIntrospector configIntrospector = new DatastoreContextIntrospector(configContext);
+        updated = configIntrospector.update(properties);
+        assertEquals("updated", true, updated);
+        configContext = configIntrospector.getContext();
+
+        assertEquals(44, configContext.getShardTransactionIdleTimeout().toMinutes());
+        assertEquals(false, configContext.isPersistent());
+        assertEquals(444, configContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+    }
+}
index d3a3a8fc2df812b88ffc500c71e17d224189b541..5197a7d991bcde78289afea0c9f39cdf29b81b45 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
-import org.junit.Before;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_CONFIGURATION_READER;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_PERSISTENT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SNAPSHOT_BATCH_COUNT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 
 public class DatastoreContextTest {
 
-    private DatastoreContext.Builder builder;
+    @Test
+    public void testNewBuilderWithDefaultSettings() {
+        DatastoreContext context = DatastoreContext.newBuilder().build();
 
-    @Before
-    public void setUp(){
-        builder = new DatastoreContext.Builder();
+        assertEquals(DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT, context.getShardTransactionIdleTimeout());
+        assertEquals(DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, context.getOperationTimeoutInSeconds());
+        assertEquals(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, context.getShardTransactionCommitTimeoutInSeconds());
+        assertEquals(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE, context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+        assertEquals(DEFAULT_SNAPSHOT_BATCH_COUNT, context.getShardRaftConfig().getSnapshotBatchCount());
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, context.getShardRaftConfig().getHeartBeatInterval().length());
+        assertEquals(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY, context.getShardTransactionCommitQueueCapacity());
+        assertEquals(DEFAULT_SHARD_INITIALIZATION_TIMEOUT.duration().toMillis(),
+                context.getShardInitializationTimeout().duration().toMillis());
+        assertEquals(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis(),
+                context.getShardLeaderElectionTimeout().duration().toMillis());
+        assertEquals(DEFAULT_PERSISTENT, context.isPersistent());
+        assertEquals(DEFAULT_CONFIGURATION_READER, context.getConfigurationReader());
+        assertEquals(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
+        assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+        assertEquals(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, context.getShardRaftConfig().getElectionTimeoutFactor());
+        assertEquals(DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, context.getTransactionCreationInitialRateLimit());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT, context.getShardBatchedModificationCount());
+        assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE,
+                context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+        assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE,
+                context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize());
+        assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE,
+                context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
+        assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE,
+                context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
     }
 
     @Test
-    public void testDefaults(){
-        DatastoreContext build = builder.build();
-
-        assertEquals(DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT , build.getShardTransactionIdleTimeout());
-        assertEquals(DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, build.getOperationTimeoutInSeconds());
-        assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, build.getShardTransactionCommitTimeoutInSeconds());
-        assertEquals(DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE, build.getShardRaftConfig().getJournalRecoveryLogBatchSize());
-        assertEquals(DatastoreContext.DEFAULT_SNAPSHOT_BATCH_COUNT, build.getShardRaftConfig().getSnapshotBatchCount());
-        assertEquals(DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getHeartBeatInterval().length());
-        assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY, build.getShardTransactionCommitQueueCapacity());
-        assertEquals(DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT, build.getShardInitializationTimeout());
-        assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout());
-        assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent());
-        assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader());
-        assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
-        assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
-        assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
-        assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
+    public void testNewBuilderWithCustomSettings() {
+        DatastoreContext.Builder builder = DatastoreContext.newBuilder();
+
+        builder.shardTransactionIdleTimeout(DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT.toMillis() + 1,
+                TimeUnit.MILLISECONDS);
+        builder.operationTimeoutInSeconds(DEFAULT_OPERATION_TIMEOUT_IN_SECONDS + 1);
+        builder.shardTransactionCommitTimeoutInSeconds(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS + 1);
+        builder.shardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE + 1);
+        builder.shardSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT + 1);
+        builder.shardHeartbeatIntervalInMillis(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS + 1);
+        builder.shardTransactionCommitQueueCapacity(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY + 1);
+        builder.shardInitializationTimeout(DEFAULT_SHARD_INITIALIZATION_TIMEOUT.
+                duration().toMillis() + 1, TimeUnit.MILLISECONDS);
+        builder.shardInitializationTimeout(DEFAULT_SHARD_INITIALIZATION_TIMEOUT.duration().toMillis() + 1,
+                TimeUnit.MILLISECONDS);
+        builder.shardLeaderElectionTimeout(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis() + 1,
+                TimeUnit.MILLISECONDS);
+        builder.persistent(!DEFAULT_PERSISTENT);
+        builder.shardIsolatedLeaderCheckIntervalInMillis(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1);
+        builder.shardSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE + 1);
+        builder.shardElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR + 1);
+        builder.transactionCreationInitialRateLimit(DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT + 1);
+        builder.shardBatchedModificationCount(DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT + 1);
+        builder.maxShardDataChangeExecutorPoolSize(
+                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE + 1);
+        builder.maxShardDataChangeExecutorQueueSize(
+                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE + 1);
+        builder.maxShardDataChangeListenerQueueSize(
+                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE + 1);
+        builder.maxShardDataStoreExecutorQueueSize(
+                InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1);
+
+        DatastoreContext context = builder.build();
+
+        verifyCustomSettings(context);
+
+        builder = DatastoreContext.newBuilderFrom(context);
+
+        DatastoreContext newContext = builder.build();
+
+        verifyCustomSettings(newContext);
+
+        Assert.assertNotSame(context, newContext);
     }
 
-}
\ No newline at end of file
+    private void verifyCustomSettings(DatastoreContext context) {
+        assertEquals(DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT.toMillis() + 1,
+                context.getShardTransactionIdleTimeout().toMillis());
+        assertEquals(DEFAULT_OPERATION_TIMEOUT_IN_SECONDS + 1, context.getOperationTimeoutInSeconds());
+        assertEquals(DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS + 1,
+                context.getShardTransactionCommitTimeoutInSeconds());
+        assertEquals(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE + 1,
+                context.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+        assertEquals(DEFAULT_SNAPSHOT_BATCH_COUNT + 1, context.getShardRaftConfig().getSnapshotBatchCount());
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS + 1,
+                context.getShardRaftConfig().getHeartBeatInterval().length());
+        assertEquals(DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY + 1, context.getShardTransactionCommitQueueCapacity());
+        assertEquals(DEFAULT_SHARD_INITIALIZATION_TIMEOUT.duration().toMillis() + 1,
+                context.getShardInitializationTimeout().duration().toMillis());
+        assertEquals(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis() + 1,
+                context.getShardLeaderElectionTimeout().duration().toMillis());
+        assertEquals(!DEFAULT_PERSISTENT, context.isPersistent());
+        assertEquals(DEFAULT_CONFIGURATION_READER, context.getConfigurationReader());
+        assertEquals(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1,
+                context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
+        assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE + 1,
+                context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+        assertEquals(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR + 1, context.getShardRaftConfig().getElectionTimeoutFactor());
+        assertEquals(DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT + 1, context.getTransactionCreationInitialRateLimit());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT + 1,
+                context.getShardBatchedModificationCount());
+        assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE + 1,
+                context.getDataStoreProperties().getMaxDataChangeExecutorPoolSize());
+        assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE + 1,
+                context.getDataStoreProperties().getMaxDataChangeExecutorQueueSize());
+        assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE + 1,
+                context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
+        assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1,
+                context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
+    }
+}
index 66fa876277ca97a164a5b433fa3e11ecb50e69a7..4ec035ee3b52308e6390d9a2e4dece6703438318 100644 (file)
@@ -25,6 +25,7 @@ public class DistributedDataStoreTest extends AbstractActorTest {
         schemaContext = TestModel.createTestContext();
 
         doReturn(schemaContext).when(actorContext).getSchemaContext();
+        doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
     }
 
     @Test
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java
new file mode 100644 (file)
index 0000000..e7afe26
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import java.util.concurrent.Semaphore;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+
+/**
+ * Unit tests for OperationCompleter.
+ *
+ * @author Thomas Pantelis
+ */
+public class OperationCompleterTest {
+
+    @Test
+    public void testOnComplete() throws Exception {
+        int permits = 10;
+        Semaphore operationLimiter = new Semaphore(permits);
+        operationLimiter.acquire(permits);
+        int availablePermits = 0;
+
+        OperationCompleter completer = new OperationCompleter(operationLimiter );
+
+        completer.onComplete(null, new DataExistsReply(true));
+        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+        completer.onComplete(null, new DataExistsReply(true));
+        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+        completer.onComplete(null, new IllegalArgumentException());
+        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+        completer.onComplete(null, new BatchedModificationsReply(4));
+        availablePermits += 4;
+        assertEquals("availablePermits", availablePermits, operationLimiter.availablePermits());
+    }
+}
index 58cec67a2d6cce1f30e3cdf41fdaf7b7d23185a7..1d1b08b5f8d57db68e9690cf1045fec5cff10bcc 100644 (file)
@@ -79,8 +79,8 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
-                            DataStoreVersions.BASE_HELIUM_VERSION), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.BASE_HELIUM_VERSION).
+                        toSerializable(), getRef());
 
             expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
 
@@ -153,9 +153,11 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                    DataStoreVersions.BASE_HELIUM_VERSION).toSerializable(), getRef());
 
-            expectMsgClass(duration, WriteDataReply.class);
+            expectMsgClass(duration, WriteDataReply.INSTANCE.toSerializable(
+                    DataStoreVersions.BASE_HELIUM_VERSION).getClass());
 
             // Ready the Tx
 
index 851fb0114b3a64c7d211a5c6375c14c41e715e69..c6b5cb44026f42690fffc89289a50c13fe1c59aa 100644 (file)
@@ -14,10 +14,14 @@ import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
@@ -46,11 +50,12 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.Duration;
 
 public class ShardTransactionTest extends AbstractActorTest {
 
@@ -250,8 +255,8 @@ public class ShardTransactionTest extends AbstractActorTest {
                     "testOnReceiveWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
-                            DataStoreVersions.HELIUM_2_VERSION), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
+                        toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
 
@@ -259,7 +264,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
             // unserialized write
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
@@ -293,8 +298,8 @@ public class ShardTransactionTest extends AbstractActorTest {
                     "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
-                            DataStoreVersions.HELIUM_2_VERSION), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
+                        toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
 
@@ -302,7 +307,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
             //unserialized merge
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
@@ -335,20 +340,73 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
                     "testDeleteData");
 
-            transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(
-                    DataStoreVersions.HELIUM_2_VERSION), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
+                    toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
 
             assertModification(transaction, DeleteModification.class);
 
             //unserialized
-            transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
 
             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveBatchedModifications() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+            final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
+
+            YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+            NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                    withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+            YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+            NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+            YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+            BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+            batched.addModification(new WriteModification(writePath, writeData));
+            batched.addModification(new MergeModification(mergePath, mergeData));
+            batched.addModification(new DeleteModification(deletePath));
+
+            transaction.tell(batched, getRef());
+
+            BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+            assertEquals("getNumBatched", 3, reply.getNumBatched());
+
+            JavaTestKit verification = new JavaTestKit(getSystem());
+            transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
+
+            CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
+                        GetCompositeModificationReply.class).getModification();
+
+            assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
+
+            WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
+            assertEquals("getPath", writePath, write.getPath());
+            assertEquals("getData", writeData, write.getData());
+
+            MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
+            assertEquals("getPath", mergePath, merge.getPath());
+            assertEquals("getData", mergeData, merge.getData());
+
+            DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
+            assertEquals("getPath", deletePath, delete.getPath());
+
+            InOrder inOrder = Mockito.inOrder(mockWriteTx);
+            inOrder.verify(mockWriteTx).write(writePath, writeData);
+            inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
+            inOrder.verify(mockWriteTx).delete(deletePath);
+        }};
+    }
 
     @Test
     public void testOnReceiveReadyTransaction() throws Exception {
@@ -463,15 +521,15 @@ public class ShardTransactionTest extends AbstractActorTest {
                 DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
-        transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(
-                DataStoreVersions.CURRENT_VERSION), ActorRef.noSender());
+        transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
+                toSerializable(), ActorRef.noSender());
     }
 
     @Test
     public void testShardTransactionInactivity() {
 
         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
-                Duration.create(500, TimeUnit.MILLISECONDS)).build();
+                500, TimeUnit.MILLISECONDS).build();
 
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
index 23c3a82a38255e9bfff2708583c4370ccb6ccf9a..88ab0dd292b4894f0eac4c8ae782452d9d696471 100644 (file)
@@ -43,6 +43,7 @@ public class TransactionChainProxyTest extends AbstractActorTest{
         actorContext.setSchemaContext(schemaContext);
 
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+        doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext();
     }
 
     @SuppressWarnings("resource")
index fa2f9187d6059f1585a1475531556d8563db3c5a..6573308c12100914badbedc5d0296b90e096a2b7 100644 (file)
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -39,13 +40,18 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -60,6 +66,11 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -71,6 +82,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -102,7 +114,10 @@ public class TransactionProxyTest {
     @Mock
     private ClusterWrapper mockClusterWrapper;
 
-    String memberName = "mock-member";
+    private final String memberName = "mock-member";
+
+    private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
+            shardBatchedModificationCount(1);
 
     @BeforeClass
     public static void setUpClass() throws IOException {
@@ -126,15 +141,13 @@ public class TransactionProxyTest {
 
         schemaContext = TestModel.createTestContext();
 
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
-
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
-        doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+        doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         ShardStrategyFactory.setConfiguration(configuration);
@@ -187,11 +200,15 @@ public class TransactionProxyTest {
     }
 
     private ReadData eqSerializedReadData() {
+        return eqSerializedReadData(TestModel.TEST_PATH);
+    }
+
+    private ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
             @Override
             public boolean matches(Object argument) {
                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+                       ReadData.fromSerializable(argument).getPath().equals(path);
             }
         };
 
@@ -210,23 +227,13 @@ public class TransactionProxyTest {
         return argThat(matcher);
     }
 
-    private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
-        return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
-            final int transactionVersion) {
+    private WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
             @Override
             public boolean matches(Object argument) {
-                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
-                        WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
-                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
-                           ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
-
+                if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) {
                     WriteData obj = WriteData.fromSerializable(argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                           obj.getData().equals(nodeToWrite);
+                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
                 }
 
                 return false;
@@ -236,39 +243,13 @@ public class TransactionProxyTest {
         return argThat(matcher);
     }
 
-    private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
-        ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(argument instanceof WriteData) {
-                    WriteData obj = (WriteData) argument;
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                        obj.getData().equals(nodeToWrite);
-                }
-                return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
-        return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
-            final int transactionVersion) {
+    private MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
             @Override
             public boolean matches(Object argument) {
-                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
-                        MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
-                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
-                           ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
-
+                if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) {
                     MergeData obj = MergeData.fromSerializable(argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                           obj.getData().equals(nodeToWrite);
+                    return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
                 }
 
                 return false;
@@ -278,41 +259,12 @@ public class TransactionProxyTest {
         return argThat(matcher);
     }
 
-    private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
-        ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
-            @Override
-            public boolean matches(Object argument) {
-                if(argument instanceof MergeData) {
-                    MergeData obj = ((MergeData) argument);
-                    return obj.getPath().equals(TestModel.TEST_PATH) &&
-                        obj.getData().equals(nodeToWrite);
-                }
-
-               return false;
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-    private DeleteData eqSerializedDeleteData() {
-        ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
-            @Override
-            public boolean matches(Object argument) {
-                return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
-                       DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
-            }
-        };
-
-        return argThat(matcher);
-    }
-
-        private DeleteData eqDeleteData() {
+    private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
             @Override
             public boolean matches(Object argument) {
-                return argument instanceof DeleteData &&
-                    ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
+                return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) &&
+                       DeleteData.fromSerializable(argument).getPath().equals(expPath);
             }
         };
 
@@ -329,7 +281,7 @@ public class TransactionProxyTest {
 
     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
             short transactionVersion) {
-        return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+        return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable());
     }
 
     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
@@ -337,7 +289,7 @@ public class TransactionProxyTest {
     }
 
     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(data));
+        return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
     }
 
     private Future<Object> dataExistsSerializedReply(boolean exists) {
@@ -348,48 +300,41 @@ public class TransactionProxyTest {
         return Futures.successful(new DataExistsReply(exists));
     }
 
-    private Future<Object> writeSerializedDataReply(short version) {
-        return Futures.successful(new WriteDataReply().toSerializable(version));
-    }
-
-    private Future<Object> writeSerializedDataReply() {
-        return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
-    }
-
-    private Future<WriteDataReply> writeDataReply() {
-        return Futures.successful(new WriteDataReply());
-    }
-
-    private Future<Object> mergeSerializedDataReply(short version) {
-        return Futures.successful(new MergeDataReply().toSerializable(version));
-    }
-
-    private Future<Object> mergeSerializedDataReply() {
-        return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+    private Future<BatchedModificationsReply> batchedModificationsReply(int count) {
+        return Futures.successful(new BatchedModificationsReply(count));
     }
 
     private Future<Object> incompleteFuture(){
         return mock(Future.class);
     }
 
-    private Future<MergeDataReply> mergeDataReply() {
-        return Futures.successful(new MergeDataReply());
+    private ActorSelection actorSelection(ActorRef actorRef) {
+        return getSystem().actorSelection(actorRef.path());
+    }
+
+    private void expectBatchedModifications(ActorRef actorRef, int count) {
+        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
-    private Future<Object> deleteSerializedDataReply(short version) {
-        return Futures.successful(new DeleteDataReply().toSerializable(version));
+    private void expectBatchedModifications(int count) {
+        doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
     }
 
-    private Future<Object> deleteSerializedDataReply() {
-        return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+    private void expectIncompleteBatchedModifications() {
+        doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
     }
 
-    private Future<DeleteDataReply> deleteDataReply() {
-        return Futures.successful(new DeleteDataReply());
+    private void expectReadyTransaction(ActorRef actorRef) {
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
     }
 
-    private ActorSelection actorSelection(ActorRef actorRef) {
-        return getSystem().actorSelection(actorRef.path());
+    private void expectFailedBatchedModifications(ActorRef actorRef) {
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
     private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
@@ -446,8 +391,7 @@ public class TransactionProxyTest {
     public void testRead() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
@@ -476,8 +420,7 @@ public class TransactionProxyTest {
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
     }
@@ -489,8 +432,7 @@ public class TransactionProxyTest {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
     }
@@ -541,21 +483,19 @@ public class TransactionProxyTest {
 
     @Test(expected = TestException.class)
     public void testReadWithPriorRecordingOperationFailure() throws Throwable {
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
+                when(mockActorContext).getDatastoreContext();
+
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        expectFailedBatchedModifications(actorRef);
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -575,14 +515,12 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
 
@@ -590,16 +528,19 @@ public class TransactionProxyTest {
                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
         assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
-
         assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+
+        InOrder inOrder = Mockito.inOrder(mockActorContext);
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
     }
 
     @Test(expected=IllegalStateException.class)
     public void testReadPreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
-
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
         transactionProxy.read(TestModel.TEST_PATH);
     }
 
@@ -625,8 +566,7 @@ public class TransactionProxyTest {
     public void testExists() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
@@ -673,23 +613,21 @@ public class TransactionProxyTest {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
     }
 
     @Test(expected = TestException.class)
     public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
+                when(mockActorContext).getDatastoreContext();
+
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        expectFailedBatchedModifications(actorRef);
 
         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
@@ -715,28 +653,30 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", true, exists);
+
+        InOrder inOrder = Mockito.inOrder(mockActorContext);
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedDataExists());
     }
 
     @Test(expected=IllegalStateException.class)
     public void testExistsPreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
-
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
         transactionProxy.exists(TestModel.TEST_PATH);
     }
 
@@ -757,7 +697,7 @@ public class TransactionProxyTest {
                     // Expected
                 }
             } else {
-                assertEquals("Recording operation Future result type", expResultType,
+                assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
                              Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
             }
         }
@@ -769,19 +709,20 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     @Test
@@ -796,10 +737,10 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
@@ -833,33 +774,28 @@ public class TransactionProxyTest {
             throw caughtEx.get();
         }
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     @Test(expected=IllegalStateException.class)
     public void testWritePreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
-
-        transactionProxy.write(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
     }
 
     @Test(expected=IllegalStateException.class)
     public void testWriteAfterReadyPreConditionCheck() {
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.ready();
 
-        transactionProxy.write(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
     }
 
     @Test
@@ -868,37 +804,40 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     @Test
     public void testDelete() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
-        doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
-        verify(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDeleteData());
+        // This sends the batched modification.
+        transactionProxy.ready();
+
+        verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                DeleteDataReply.class);
+                BatchedModificationsReply.class);
     }
 
     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
@@ -935,14 +874,10 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
+        expectReadyTransaction(actorRef);
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -955,9 +890,12 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.class);
+                BatchedModificationsReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
+                isA(BatchedModifications.class));
     }
 
     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
@@ -969,14 +907,16 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+        doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
 
-        doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
+        doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+        doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
+                executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
+
+        expectReadyTransaction(actorRef);
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
@@ -993,6 +933,8 @@ public class TransactionProxyTest {
 
         transactionProxy.merge(TestModel.TEST_PATH, testNode);
 
+        transactionProxy.delete(TestModel.TEST_PATH);
+
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
@@ -1000,7 +942,8 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
+                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class,
+                ShardTransactionMessages.DeleteDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
@@ -1029,21 +972,13 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        expectFailedBatchedModifications(actorRef);
 
-        doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+        expectReadyTransaction(actorRef);
 
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
-
-        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1055,8 +990,7 @@ public class TransactionProxyTest {
 
         verifyCohortFutures(proxy, TestException.class);
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.class, TestException.class);
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
     }
 
     @Test
@@ -1065,15 +999,13 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)),
                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1084,7 +1016,7 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.class);
+                BatchedModificationsReply.class);
 
         verifyCohortFutures(proxy, TestException.class);
     }
@@ -1095,8 +1027,7 @@ public class TransactionProxyTest {
         doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
                 mockActorContext).findPrimaryShardAsync(anyString());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -1121,15 +1052,13 @@ public class TransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        expectBatchedModifications(actorRef, 1);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)),
                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1160,8 +1089,7 @@ public class TransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -1197,9 +1125,7 @@ public class TransactionProxyTest {
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
         CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-            .setTransactionId("txn-1")
-            .setTransactionActorPath(actorPath)
-            .build();
+            .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
             executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
@@ -1240,7 +1166,7 @@ public class TransactionProxyTest {
     }
 
     @Test
-    public void testLocalTxActorWrite() throws Exception {
+    public void testLocalTxActorReady() throws Exception {
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
 
@@ -1251,48 +1177,26 @@ public class TransactionProxyTest {
             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-            .setTransactionId("txn-1")
-            .setTransactionActorPath(actorPath)
-            .build();
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+            setTransactionId("txn-1").setTransactionActorPath(actorPath).
+            setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
-        executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+            executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
                 eqCreateTransaction(memberName, WRITE_ONLY));
 
         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
 
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqWriteData(nodeToWrite));
+        doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
+                any(ActorSelection.class), isA(BatchedModifications.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        verify(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqWriteData(nodeToWrite));
-
-        //testing local merge
-        doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqMergeData(nodeToWrite));
-
-        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
-        verify(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqMergeData(nodeToWrite));
-
-
-        //testing local delete
-        doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
-            any(ActorSelection.class), eqDeleteData());
 
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-            WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
+                BatchedModificationsReply.class);
 
         // testing ready
         doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
@@ -1333,10 +1237,9 @@ public class TransactionProxyTest {
         }
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-                .setTransactionId("txn-1")
-                .setTransactionActorPath(actorPath)
-                .build();
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+                setTransactionId("txn-1").setTransactionActorPath(actorPath).
+                setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
@@ -1352,9 +1255,9 @@ public class TransactionProxyTest {
 
         long end = System.nanoTime();
 
-        Assert.assertTrue(String.format("took less time than expected %s was %s",
-                TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
-                (end-start)), (end - start) > TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+                expected, (end-start)), (end - start) > expected);
 
     }
 
@@ -1380,10 +1283,9 @@ public class TransactionProxyTest {
         }
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
-                .setTransactionId("txn-1")
-                .setTransactionActorPath(actorPath)
-                .build();
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+                setTransactionId("txn-1").setTransactionActorPath(actorPath).
+                setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
@@ -1399,9 +1301,9 @@ public class TransactionProxyTest {
 
         long end = System.nanoTime();
 
-        Assert.assertTrue(String.format("took more time than expected %s was %s",
-                TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
-                (end-start)), (end - start) <= TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+                expected, (end-start)), (end - start) <= expected);
     }
 
     public void testWriteThrottling(boolean shardFound){
@@ -1411,8 +1313,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectBatchedModifications(2);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1428,15 +1329,13 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectIncompleteBatchedModifications();
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
             }
         });
-
     }
 
     @Test
@@ -1447,8 +1346,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectBatchedModifications(2);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -1466,15 +1364,13 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+                expectBatchedModifications(2);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
             }
         });
-
     }
 
     @Test
@@ -1485,8 +1381,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+                expectIncompleteBatchedModifications();
 
                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
 
@@ -1503,8 +1398,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+                expectBatchedModifications(2);
 
                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
 
@@ -1520,8 +1414,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+                expectBatchedModifications(2);
 
                 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
 
@@ -1537,8 +1430,7 @@ public class TransactionProxyTest {
         throttleOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqDeleteData());
+                expectIncompleteBatchedModifications();
 
                 transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -1554,8 +1446,7 @@ public class TransactionProxyTest {
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqDeleteData());
+                expectBatchedModifications(2);
 
                 transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -1569,8 +1460,7 @@ public class TransactionProxyTest {
         completeOperation(new TransactionProxyOperation() {
             @Override
             public void run(TransactionProxy transactionProxy) {
-                doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqDeleteData());
+                expectBatchedModifications(2);
 
                 transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -1688,8 +1578,7 @@ public class TransactionProxyTest {
             public void run(TransactionProxy transactionProxy) {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+                expectBatchedModifications(1);
 
                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
                         any(ActorSelection.class), any(ReadyTransaction.class));
@@ -1710,11 +1599,7 @@ public class TransactionProxyTest {
                 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
                 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
 
-                doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(nodeToWrite));
-
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), eqWriteData(carsNode));
+                expectBatchedModifications(2);
 
                 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
                         any(ActorSelection.class), any(ReadyTransaction.class));
@@ -1727,4 +1612,203 @@ public class TransactionProxyTest {
             }
         }, 2, true);
     }
+
+    @Test
+    public void testModificationOperationBatching() throws Throwable {
+        int shardBatchedModificationCount = 3;
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
+                when(mockActorContext).getDatastoreContext();
+
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        expectBatchedModifications(actorRef, shardBatchedModificationCount);
+
+        expectReadyTransaction(actorRef);
+
+        YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
+        NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+        YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
+        NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+        YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
+        YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.write(writePath1, writeNode1);
+        transactionProxy.write(writePath2, writeNode2);
+        transactionProxy.delete(deletePath1);
+        transactionProxy.merge(mergePath1, mergeNode1);
+        transactionProxy.merge(mergePath2, mergeNode2);
+        transactionProxy.write(writePath3, writeNode3);
+        transactionProxy.merge(mergePath3, mergeNode3);
+        transactionProxy.delete(deletePath2);
+
+        // This sends the last batch.
+        transactionProxy.ready();
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+                new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
+
+        verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+                new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
+
+        verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
+                new DeleteModification(deletePath2));
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+    }
+
+    @Test
+    public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
+        int shardBatchedModificationCount = 10;
+        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
+                when(mockActorContext).getDatastoreContext();
+
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        expectBatchedModifications(actorRef, shardBatchedModificationCount);
+
+        YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+        YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
+        NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+        YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
+
+        doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+
+        doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+
+        doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedDataExists());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.write(writePath1, writeNode1);
+        transactionProxy.write(writePath2, writeNode2);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
+                get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
+
+        transactionProxy.merge(mergePath1, mergeNode1);
+        transactionProxy.merge(mergePath2, mergeNode2);
+
+        readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
+
+        transactionProxy.delete(deletePath);
+
+        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+        assertEquals("Exists response", true, exists);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+                new WriteModification(writePath2, writeNode2));
+
+        verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+                new MergeModification(mergePath2, mergeNode2));
+
+        verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
+
+        InOrder inOrder = Mockito.inOrder(mockActorContext);
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+        inOrder.verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedDataExists());
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+    }
+
+    private List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
+        ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
+                ArgumentCaptor.forClass(BatchedModifications.class);
+        verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
+                eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
+
+        List<BatchedModifications> batchedModifications = filterCaptured(
+                batchedModificationsCaptor, BatchedModifications.class);
+        return batchedModifications;
+    }
+
+    private <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
+        List<T> captured = new ArrayList<>();
+        for(T c: captor.getAllValues()) {
+            if(type.isInstance(c)) {
+                captured.add(c);
+            }
+        }
+
+        return captured;
+    }
+
+    private void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), expected);
+    }
+
+    private void verifyBatchedModifications(Object message, Modification... expected) {
+        assertEquals("Message type", BatchedModifications.class, message.getClass());
+        BatchedModifications batchedModifications = (BatchedModifications)message;
+        assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+        for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
+            Modification actual = batchedModifications.getModifications().get(i);
+            assertEquals("Modification type", expected[i].getClass(), actual.getClass());
+            assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
+                    ((AbstractModification)actual).getPath());
+            if(actual instanceof WriteModification) {
+                assertEquals("getData", ((WriteModification)expected[i]).getData(),
+                        ((WriteModification)actual).getData());
+            }
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java
new file mode 100644 (file)
index 0000000..15d2eea
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for BatchedModifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModificationsTest {
+
+    @Test
+    public void testSerialization() {
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+        YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+        BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+        batched.addModification(new WriteModification(writePath, writeData));
+        batched.addModification(new MergeModification(mergePath, mergeData));
+        batched.addModification(new DeleteModification(deletePath));
+
+        BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
+                (Serializable) batched.toSerializable());
+
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+
+        assertEquals("getModifications size", 3, clone.getModifications().size());
+
+        WriteModification write = (WriteModification)clone.getModifications().get(0);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion());
+        assertEquals("getPath", writePath, write.getPath());
+        assertEquals("getData", writeData, write.getData());
+
+        MergeModification merge = (MergeModification)clone.getModifications().get(1);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion());
+        assertEquals("getPath", mergePath, merge.getPath());
+        assertEquals("getData", mergeData, merge.getData());
+
+        DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
+        assertEquals("getPath", deletePath, delete.getPath());
+    }
+
+    @Test
+    public void testBatchedModificationsReplySerialization() {
+        BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
+                (Serializable) new BatchedModificationsReply(100).toSerializable());
+        assertEquals("getNumBatched", 100, clone.getNumBatched());
+    }
+}
index e950b78ab7d609a6b0aac6e34431cbb5a3f36584..97bade152e0484dd832aacf4c962cc58b6c77d11 100644 (file)
@@ -22,21 +22,22 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  *
  * @author Thomas Pantelis
  */
+@Deprecated
 public class DeleteDataTest {
 
     @Test
     public void testSerialization() {
         YangInstanceIdentifier path = TestModel.TEST_PATH;
 
-        DeleteData expected = new DeleteData(path);
+        DeleteData expected = new DeleteData(path, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", DeleteData.class, serialized.getClass());
         assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)serialized).getVersion());
 
         Object clone = SerializationUtils.clone((Serializable) serialized);
-        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)clone).getVersion());
         DeleteData actual = DeleteData.fromSerializable(clone);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getPath", expected.getPath(), actual.getPath());
     }
 
@@ -58,9 +59,9 @@ public class DeleteDataTest {
     public void testSerializationWithHeliumR1Version() throws Exception {
         YangInstanceIdentifier path = TestModel.TEST_PATH;
 
-        DeleteData expected = new DeleteData(path);
+        DeleteData expected = new DeleteData(path, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.DeleteData.class, serialized.getClass());
 
         DeleteData actual = DeleteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
index 5b40afdff8288ff714cc4f9b4a398e2f2724369b..011d22798e628ee4800f980a4718667781ffdb16 100644 (file)
@@ -14,6 +14,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
+@Deprecated
 public class MergeDataTest {
 
     @Test
@@ -23,15 +24,15 @@ public class MergeDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        MergeData expected = new MergeData(path, data);
+        MergeData expected = new MergeData(path, data, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", MergeData.class, serialized.getClass());
         assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion());
 
         Object clone = SerializationUtils.clone((Serializable) serialized);
-        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion());
         MergeData actual = MergeData.fromSerializable(clone);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getPath", expected.getPath(), actual.getPath());
         assertEquals("getData", expected.getData(), actual.getData());
     }
@@ -58,9 +59,9 @@ public class MergeDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        MergeData expected = new MergeData(path, data);
+        MergeData expected = new MergeData(path, data, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass());
 
         MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
index 8ce73296c14cd0af90491ae4f87e99fa85637c5e..7ad45a61702607cdaf03a61b23573575d54830cd 100644 (file)
@@ -32,13 +32,14 @@ public class ReadDataReplyTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        ReadDataReply expected = new ReadDataReply(data);
+        ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ReadDataReply.class, serialized.getClass());
 
         ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
                 (Serializable) serialized));
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
     }
 
@@ -60,9 +61,9 @@ public class ReadDataReplyTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        ReadDataReply expected = new ReadDataReply(data);
+        ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass());
 
         ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
index 90a76f229e1ddd045c084e56b1b854a518b43a92..8148c9ca9528fc709ad1ecaf3febbda265bc033d 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableCo
  *
  * @author Thomas Pantelis
  */
+@Deprecated
 public class WriteDataTest {
 
     @Test
@@ -35,15 +36,15 @@ public class WriteDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteData expected = new WriteData(path, data);
+        WriteData expected = new WriteData(path, data, DataStoreVersions.CURRENT_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", WriteData.class, serialized.getClass());
         assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion());
 
         Object clone = SerializationUtils.clone((Serializable) serialized);
-        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion());
         WriteData actual = WriteData.fromSerializable(clone);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
         assertEquals("getPath", expected.getPath(), actual.getPath());
         assertEquals("getData", expected.getData(), actual.getData());
     }
@@ -69,9 +70,9 @@ public class WriteDataTest {
                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
                 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteData expected = new WriteData(path, data);
+        WriteData expected = new WriteData(path, data, DataStoreVersions.HELIUM_1_VERSION);
 
-        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        Object serialized = expected.toSerializable();
         assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass());
 
         WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
index b9d44b2586f75a34b163322f9888b94419dd7cee..e2acaa8d10789d2d1e1f2b5a6f127b5a1c7a3d9a 100644 (file)
@@ -7,6 +7,7 @@ import com.google.common.base.Stopwatch;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -53,17 +54,22 @@ public class MutableCompositeModificationTest extends AbstractModificationTest {
 
         MutableCompositeModification clone = (MutableCompositeModification) SerializationUtils.clone(compositeModification);
 
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+
         assertEquals("getModifications size", 3, clone.getModifications().size());
 
         WriteModification write = (WriteModification)clone.getModifications().get(0);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion());
         assertEquals("getPath", writePath, write.getPath());
         assertEquals("getData", writeData, write.getData());
 
         MergeModification merge = (MergeModification)clone.getModifications().get(1);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion());
         assertEquals("getPath", mergePath, merge.getPath());
         assertEquals("getData", mergeData, merge.getData());
 
         DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
         assertEquals("getPath", deletePath, delete.getPath());
     }
 
index 67fa0960cbcb96cc2f617148aab2c7a7a7bab71d..9761ed8615a763df20ec6763defc69b55395a120 100644 (file)
@@ -31,7 +31,10 @@ public class TestModel {
   private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
 
   public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
-  public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build();
+  public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+          node(OUTER_LIST_QNAME).build();
+  public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+          node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
   public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
   public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
 
index 7a879f33779f1a41343d3cc48d39064c461f57b9..03aa31680b97813ef8f6e5a80ee93b5a0d52e6fc 100644 (file)
@@ -60,17 +60,16 @@ public class NormalizedNodeJsonBodyWriter implements MessageBodyWriter<Normalize
             final MediaType mediaType, final MultivaluedMap<String, Object> httpHeaders, final OutputStream entityStream)
                     throws IOException, WebApplicationException {
         NormalizedNode<?, ?> data = t.getData();
-        InstanceIdentifierContext context = t.getInstanceIdentifierContext();
-        DataSchemaNode schema = context.getSchemaNode();
+        final InstanceIdentifierContext context = t.getInstanceIdentifierContext();
+        final DataSchemaNode schema = context.getSchemaNode();
         SchemaPath path = context.getSchemaNode().getPath();
-        OutputStreamWriter outputWriter = new OutputStreamWriter(entityStream, Charsets.UTF_8);
+        final OutputStreamWriter outputWriter = new OutputStreamWriter(entityStream, Charsets.UTF_8);
         if (data == null) {
             throw new RestconfDocumentedException(Response.Status.NOT_FOUND);
         }
 
         boolean isDataRoot = false;
         URI initialNs = null;
-        outputWriter.write('{');
         if (SchemaPath.ROOT.equals(path)) {
             isDataRoot = true;
         } else {
@@ -80,8 +79,8 @@ public class NormalizedNodeJsonBodyWriter implements MessageBodyWriter<Normalize
         if(!schema.isAugmenting() && !(schema instanceof SchemaContext)) {
             initialNs = schema.getQName().getNamespace();
         }
-        NormalizedNodeStreamWriter jsonWriter = JSONNormalizedNodeStreamWriter.create(context.getSchemaContext(),path,initialNs,outputWriter);
-        NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(jsonWriter);
+        final NormalizedNodeStreamWriter jsonWriter = JSONNormalizedNodeStreamWriter.create(context.getSchemaContext(),path,initialNs,outputWriter);
+        final NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(jsonWriter);
         if(isDataRoot) {
             writeDataRoot(outputWriter,nnWriter,(ContainerNode) data);
         } else {
@@ -91,14 +90,13 @@ public class NormalizedNodeJsonBodyWriter implements MessageBodyWriter<Normalize
             nnWriter.write(data);
         }
         nnWriter.flush();
-        outputWriter.write('}');
         outputWriter.flush();
     }
 
-    private void writeDataRoot(OutputStreamWriter outputWriter, NormalizedNodeWriter nnWriter, ContainerNode data) throws IOException {
-        Iterator<DataContainerChild<? extends PathArgument, ?>> iterator = data.getValue().iterator();
+    private void writeDataRoot(final OutputStreamWriter outputWriter, final NormalizedNodeWriter nnWriter, final ContainerNode data) throws IOException {
+        final Iterator<DataContainerChild<? extends PathArgument, ?>> iterator = data.getValue().iterator();
         while(iterator.hasNext()) {
-            DataContainerChild<? extends PathArgument, ?> child = iterator.next();
+            final DataContainerChild<? extends PathArgument, ?> child = iterator.next();
             nnWriter.write(child);
             nnWriter.flush();
         }
index 8e0ff5c859f101003f69ea4a4ed6b58daa6e3072..8325d84e3553cf660b17e00e352463faaff4bdf1 100644 (file)
@@ -14,6 +14,12 @@ import org.opendaylight.controller.networkconfig.neutron.INeutronObject;
 
 import java.util.List;
 
+/**
+ * This interface defines the methods for Neutron Requests
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.northbound.api.INeutronRequest}
+ */
+@Deprecated
 public interface INeutronRequest<T extends INeutronObject> {
     public T getSingleton();
     public boolean isSingleton();
index 9b4f579025e55340a8317d6cda20a0368c014e25..e0a59091f1c5c968d1c3c65233fb32bc3fc9c656 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of Firewall Rules needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallAware}
  */
 
+@Deprecated
 public interface INeutronFirewallAware {
 
     /**
index c986bffd6b6a72574e42374888e44dc9b87061ff..5e7fbb2090ca8bab3ba4ca8d018803053bb88eb0 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB OpenStack Firewall objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallCRUD}
  */
 
+@Deprecated
 public interface INeutronFirewallCRUD {
     /**
      * Applications call this interface method to determine if a particular
index 203d513923c406c5fef5cca5e6eb2034312ffb1a..ba467834d4dccc78d0e287f32462231993ef5892 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of Firewall Policys needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallPolicyAware}
  */
 
+@Deprecated
 public interface INeutronFirewallPolicyAware {
 
     /**
index 6049656a6e309069f997adb755d7304faeb1a840..f280ff2266f63813a9ed09effd1aa1d2806aa3bf 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB OpenStack Firewall Policy objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallPolicyCRUD}
  */
 
+@Deprecated
 public interface INeutronFirewallPolicyCRUD {
     /**
      * Applications call this interface method to determine if a particular
index a663058328f8fef01bc9ee74b436e4d03b837018..5863cdd0dc37f06e4928c8341a3abf2c509cde93 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of Firewall Rules needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallRuleAware}
  */
 
+@Deprecated
 public interface INeutronFirewallRuleAware {
 
     /**
index 990896bdb7249591cf90e4f2509e9be36cfa572f..b0c8c613c135c615a955062fcecfa1248a7596d7 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB OpenStack Firewall Rule objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFirewallRuleCRUD}
  */
 
+@Deprecated
 public interface INeutronFirewallRuleCRUD {
     /**
      * Applications call this interface method to determine if a particular
index 43175d3236455e1d3f9f4a5df23b3e3e290e9ac0..0a6c76ccb82c3b09835367e9411ffc9cde6e374c 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of Neutron FloatingIPs needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFloatingIPAware}
  */
 
+@Deprecated
 public interface INeutronFloatingIPAware {
 
     /**
index e416ee7d2f31efb450fc4c6795a687c95eb25591..561e1dae730ba529a42cdb71e8f3512a8d83a3a4 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB FloatingIP objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronFloatingIPCRUD}
  */
 
+@Deprecated
 public interface INeutronFloatingIPCRUD {
     /**
      * Applications call this interface method to determine if a particular
index e4aa5f382b2a152741169c0ebb6ea861240a5b9c..7202b01617ca553ec1814a6855a8a2cace1d6b1d 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of LoadBalancer Rules needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerAware}
  */
 
+@Deprecated
 public interface INeutronLoadBalancerAware {
 
     /**
index a2ce41eab2b825b4d630ef49d2651e84d28dd5bc..394622fe6c5a6e43c66f4c9cb6f0ccfa3daf3869 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB OpenStack LoadBalancer objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerCRUD}
  */
 
+@Deprecated
 public interface INeutronLoadBalancerCRUD {
     /**
      * Applications call this interface method to determine if a particular
index 7194da32b4a4f4974b2b068a34a1cedc8df15744..2259435c9eb90f7064408e0db7373de0f5ac8288 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of LoadBalancerHealthMonitor Rules needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerHealthMonitorAware}
  */
 
+@Deprecated
 public interface INeutronLoadBalancerHealthMonitorAware {
 
     /**
index 78380001dfa9e98faf6cbca5e38caf9031b3a726..f7b11a53aad24c84373a0567a0b882c872dde8f8 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB OpenStack LoadBalancerHealthMonitor objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerHealthMonitorCRUD}
  */
 
+@Deprecated
 public interface INeutronLoadBalancerHealthMonitorCRUD {
     /**
      * Applications call this interface method to determine if a particular
index 417419f93642d527626596a0418dfcab1ced1515..2043878905a6292218eee7738bf443073abf84cf 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of LoadBalancerListener Rules needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerListenerAware}
  */
 
+@Deprecated
 public interface INeutronLoadBalancerListenerAware {
 
     /**
index c160f8ed8b6554da6431d96b1868863041eee08c..7336c76cd375098475c09deb3a5271e00e092dc9 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB OpenStack LoadBalancerListener objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerListenerCRUD}
  */
 
+@Deprecated
 public interface INeutronLoadBalancerListenerCRUD {
     /**
      * Applications call this interface method to determine if a particular
index 16c7d3716973b3d9e4b18d9ed60469e2469b24e5..851df355bb139e9bbd9f49c7d5df9d45f1ca8b22 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of LoadBalancerPool Rules needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerPoolAware}
  */
 
+@Deprecated
 public interface INeutronLoadBalancerPoolAware {
 
     /**
index 9614448d065a56c3d4ce0ab965720ec94c18dbe9..7fabcd92423fa5a4b252b4202ed98cdc0dbd1190 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB OpenStack LoadBalancerPool objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerPoolCRUD}
  */
 
+@Deprecated
 public interface INeutronLoadBalancerPoolCRUD {
     /**
      * Applications call this interface method to determine if a particular
index 69a57748ddabc88212411cf6b4ecf3d088a80381..88a16cae4d5e9ae02da1cc8b025690eb17f7afb4 100644 (file)
@@ -7,6 +7,13 @@
  */
 package org.opendaylight.controller.networkconfig.neutron;
 
+/**
+ * This interface defines the methods for CRUD of NB OpenStack INeutronLoadBalancerPoolMemberAware objects
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerPoolMemberAware}
+ */
+
+@Deprecated
 public interface INeutronLoadBalancerPoolMemberAware {
 
 
index c1f5c7003ce87750d1cff33649e367eedbf94e4f..99b32169e18f25c6dc19ccfd61979f892f330e05 100644 (file)
@@ -10,6 +10,13 @@ package org.opendaylight.controller.networkconfig.neutron;
 
 import java.util.List;
 
+/**
+ * This interface defines the methods for CRUD of NB OpenStack INeutronLoadBalancerPoolMemberCRUD objects
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronLoadBalancerPoolMemberCRUD}
+ */
+
+@Deprecated
 public interface INeutronLoadBalancerPoolMemberCRUD {
 
     /**
index 88d3c1dc6e84726842361add116002fd035ebb24..f5936a63a528c07e5e1427c4d2ff84f72bea9a3c 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of Neutron Networks needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronNetworkAware}
  */
 
+@Deprecated
 public interface INeutronNetworkAware {
 
     /**
index bf900a618f1057c5fbc8babc12635cbec7045d37..7f86536fd96850ef3a74d150ef155dfced1e6b4b 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB network objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronNetworkCRUD}
  */
 
+@Deprecated
 public interface INeutronNetworkCRUD {
     /**
      * Applications call this interface method to determine if a particular
index bebac37c41732bf2bb102d698a2e13202f72302d..e39c07eadc2d67249a7a36e8080127ffcbe603d7 100644 (file)
@@ -12,7 +12,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 
 /**
  * This class contains behaviour common to Neutron configuration objects
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronObject}
  */
+@Deprecated
 public interface INeutronObject {
     public String getID();
     public void setID(String id);
index 3f40ba3879a84a2b8aa517f0baad66e2cf0c1672..3a14c308a2ba2cac1a1832c549c802dd1970945d 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of Neutron Ports needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronPortAware}
  */
 
+@Deprecated
 public interface INeutronPortAware {
 
     /**
index ce30c6806eea294c57bc637e3fb532377c3b21b4..58b007b65d2c784401f4419fa481deddb31786ee 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB Port objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronPortCRUD}
  */
 
+@Deprecated
 public interface INeutronPortCRUD {
     /**
      * Applications call this interface method to determine if a particular
index 3c9e83d490c0e0d561077f42ad3903b1c11e93b9..f4c44343621cf52935f1258b7692fa473400f30a 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of Neutron Routers needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronRouterAware}
  */
 
+@Deprecated
 public interface INeutronRouterAware {
 
     /**
index b1a943fce1946e287271da2984206c84b3942bab..37b2c940e0fdf04cbb14eb3eb266d1738c21ce1d 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB Router objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronRouterCRUD}
  */
 
+@Deprecated
 public interface INeutronRouterCRUD {
     /**
      * Applications call this interface method to determine if a particular
index 0fdf77f968708111a5dd3dd99a442843c762351e..b518924e195820494841a613f44e908b503f2c2a 100644 (file)
@@ -11,8 +11,11 @@ package org.opendaylight.controller.networkconfig.neutron;
 
 /**
  * This interface defines the methods a service that wishes to be aware of Neutron Security Groups needs to implement
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSecurityGroupAware}
  */
 
+@Deprecated
 public interface INeutronSecurityGroupAware {
 
     /**
index a408ef92a7be75bcfff59195a6f3c6632ac13292..6b20182eae46f6bc5b10214f8dac1f858bd86ae2 100644 (file)
@@ -13,8 +13,11 @@ import java.util.List;
 
 /**
  * This interface defines the methods for CRUD of NB OpenStack Security Group objects
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSecurityGroupCRUD}
  */
 
+@Deprecated
 public interface INeutronSecurityGroupCRUD {
     /**
      * Applications call this interface method to determine if a particular
index ff2a1c4978f6891998c098905439ec932daf76f0..3be9cc92b696c53ab706f450949e1d277f9753b1 100644 (file)
@@ -11,8 +11,11 @@ package org.opendaylight.controller.networkconfig.neutron;
 
 /**
  * This interface defines the methods required to be aware of Neutron Security Rules
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSecurityRuleAware}
  */
 
+@Deprecated
 public interface INeutronSecurityRuleAware {
 
     /**
index 73b41c71a4745b204028a04f3a54ea9850ad79b7..a5223b7e8d9167a28de13f111620149067cce9bf 100644 (file)
@@ -13,8 +13,11 @@ import java.util.List;
 
 /**
  * This interface defines the methods for CRUD of NB OpenStack Security Rule objects
+ *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSecurityRuleCRUD}
  */
 
+@Deprecated
 public interface INeutronSecurityRuleCRUD {
     /**
      * Applications call this interface method to determine if a particular
index fa0707698d6a88208693bfb9233fd523b512c6de..57d382b127afbb76f2e7d5d0bbaf84ac61ad08c0 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.networkconfig.neutron;
 /**
  * This interface defines the methods a service that wishes to be aware of Neutron Subnets needs to implement
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSubnetAware}
  */
 
+@Deprecated
 public interface INeutronSubnetAware {
 
     /**
index 6f9a6ffb7b54850dce84b900ca08cadd57c00ce4..842068284810e07766c128b18da8371e37a77a9b 100644 (file)
@@ -13,8 +13,10 @@ import java.util.List;
 /**
  * This interface defines the methods for CRUD of NB Subnet objects
  *
+ * @deprecated Replaced by {@link org.opendaylight.neutron.neutron.spi.INeutronSubnetCRUD}
  */
 
+@Deprecated
 public interface INeutronSubnetCRUD {
     /**
      * Applications call this interface method to determine if a particular