Merge "Enable AD-SAL application to configure OF 1.3 PUSH_VLAN action."
authorEd Warnicke <eaw@cisco.com>
Fri, 5 Sep 2014 07:12:07 +0000 (07:12 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 5 Sep 2014 07:12:07 +0000 (07:12 +0000)
56 files changed:
features/mdsal/src/main/resources/features.xml
features/netconf/pom.xml
features/netconf/src/main/resources/features.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/distribution/opendaylight-karaf-resources/pom.xml
opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties
opendaylight/distribution/opendaylight-karaf/pom.xml
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/RuntimeMappingModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractWriteTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/MountPointManagerImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardMBeanFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/AbstractDOMStoreTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadWriteTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedWriteTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/yang/opendaylight-inmemory-datastore-provider.yang
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java

index ae73c71ee4c9f46b362c48e98015b5e27f538497..619eaee8a8e52afae5f488facb5c1ff6badd6a4b 100644 (file)
         <bundle>mvn:org.opendaylight.controller/sal-netconf-connector/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller.model/model-inventory/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/netconf-config-dispatcher/${config.version}</bundle>
+        <configfile finalname='${config.configfile.directory}/${config.netconf.client.configfile}'>mvn:org.opendaylight.controller/netconf-config/${netconf.version}/xml/config</configfile>
+    </feature>
+    <feature name='odl-mdsal-netconf-connector-ssh' version='${project.version}' description="OpenDaylight :: MDSAL :: Netconf Connector + Netconf SSH Server + loopback connection configuration">
+        <feature version='${netconf.version}'>odl-netconf-ssh</feature>
+        <feature version='${project.version}'>odl-mdsal-netconf-connector</feature>
         <configfile finalname="${config.configfile.directory}/${config.netconf.connector.configfile}">mvn:org.opendaylight.controller/netconf-connector-config/${netconf.version}/xml/config</configfile>
     </feature>
     <feature name='odl-restconf' version='${project.version}' description="OpenDaylight :: Restconf">
index d18d227f0013d1e861d56759840403dee0c06ada..46f83fb51414c3f9b48fcfe1c9c4f6a1e92a1522 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-auth</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-tcp</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-ssh</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcpkix-jdk15on</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcprov-jdk15on</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>ietf-netconf-monitoring</artifactId>
       <type>xml</type>
       <classifier>config</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-connector-config</artifactId>
+      <version>${config.version}</version>
+      <type>xml</type>
+      <classifier>config</classifier>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-monitoring</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.aaa</groupId>
+      <artifactId>features-aaa</artifactId>
+      <version>${aaa.version}</version>
+      <classifier>features</classifier>
+      <type>xml</type>
+    </dependency>
   </dependencies>
 
   <build>
index 0a6356231a84d1c671fa07e48212200a61451490..4157212f2e35f9fc1c35f89a5a8a65fe0b262620 100644 (file)
@@ -5,11 +5,15 @@
           xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
   <repository>mvn:org.opendaylight.controller/features-protocol-framework/${protocol-framework.version}/xml/features</repository>
   <repository>mvn:org.opendaylight.controller/features-config/${config.version}/xml/features</repository>
+  <repository>mvn:org.opendaylight.aaa/features-aaa/${aaa.version}/xml/features</repository>
+
   <feature name='odl-netconf-all' version='${project.version}' description="OpenDaylight :: Netconf :: All">
     <feature version='${project.version}'>odl-netconf-api</feature>
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <feature version='${project.version}'>odl-netconf-util</feature>
     <feature version='${project.version}'>odl-netconf-impl</feature>
+    <feature version='${project.version}'>odl-netconf-tcp</feature>
+    <feature version='${project.version}'>odl-netconf-ssh</feature>
     <feature version='${project.version}'>odl-config-netconf-connector</feature>
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
     <feature version='${project.version}'>odl-netconf-client</feature>
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <feature version='${project.version}'>odl-netconf-util</feature>
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
+    <!-- Netconf server without config connector is just an empty shell -->
+    <feature version='${project.version}'>odl-config-netconf-connector</feature>
+    <!-- Netconf will not provide schemas without monitoring -->
+    <feature version='${project.version}'>odl-netconf-monitoring</feature>
     <bundle>mvn:org.opendaylight.controller/netconf-impl/${project.version}</bundle>
   </feature>
+  <feature name='odl-netconf-ssh' version='${project.version}' description="OpenDaylight :: Netconf :: SSSH">
+    <feature version='${project.version}'>odl-netconf-tcp</feature>
+    <feature version='${aaa.version}'>odl-aaa-authn-plugin</feature>
+    <bundle>mvn:org.opendaylight.controller/netconf-ssh/${project.version}</bundle>
+    <bundle>mvn:org.bouncycastle/bcpkix-jdk15on/${bouncycastle.version}</bundle>
+    <bundle>mvn:org.bouncycastle/bcprov-jdk15on/${bouncycastle.version}</bundle>
+  </feature>
+  <feature name='odl-netconf-tcp' version='${project.version}' description="OpenDaylight :: Netconf :: TCP">
+    <feature version='${project.version}'>odl-netconf-impl</feature>
+    <bundle>mvn:org.opendaylight.controller/netconf-tcp/${project.version}</bundle>
+  </feature>
   <feature name='odl-config-netconf-connector' version='${project.version}' description="OpenDaylight :: Netconf :: Connector">
     <feature version='${config.version}'>odl-config-manager</feature>
     <feature version='${project.version}'>odl-netconf-api</feature>
@@ -64,7 +83,6 @@
   <feature name='odl-netconf-client' version='${project.version}' description="OpenDaylight :: Netconf :: Client">
     <feature version='${project.version}'>odl-netconf-netty-util</feature>
     <bundle>mvn:org.opendaylight.controller/netconf-client/${project.version}</bundle>
-    <configfile finalname='${config.configfile.directory}/${config.netconf.client.configfile}'>mvn:org.opendaylight.controller/netconf-config/${netconf.version}/xml/config</configfile>
   </feature>
   <feature name='odl-netconf-monitoring' version='${project.version}' description="OpenDaylight :: Netconf :: Monitoring">
     <feature version='${project.version}'>odl-netconf-util</feature>
index b05170ddc842e6011109dc3b4604cc32d53a5251..2e817b97f36a85eebc9a0256284221f60b2b61a3 100644 (file)
@@ -67,6 +67,7 @@
     <concepts.version>0.5.2-SNAPSHOT</concepts.version>
     <concurrentlinkedhashmap.version>1.4</concurrentlinkedhashmap.version>
     <config.version>0.2.5-SNAPSHOT</config.version>
+    <aaa.version>0.1.0-SNAPSHOT</aaa.version>
     <config.configfile.directory>etc/opendaylight/karaf</config.configfile.directory>
     <config.clustering.configfile>05-clustering.xml</config.clustering.configfile>
     <config.netty.configfile>00-netty.xml</config.netty.configfile>
index 00495a32010e7909aa1a14076e83fd272820acec..e34a5d3c2cbd25e888fde5805618bfbf0bc9899d 100644 (file)
   <artifactId>opendaylight-karaf-resources</artifactId>
   <description>Resources for opendaylight-karaf</description>
   <packaging>jar</packaging>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>copy</id>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <!-- here the phase you need -->
+            <phase>generate-resources</phase>
+            <configuration>
+              <artifactItems>
+                  <!-- Needs to be copied to lib/ext in order to start bouncy provider for mina sshd -->
+                <artifactItem>
+                    <groupId>org.bouncycastle</groupId>
+                    <artifactId>bcprov-jdk15on</artifactId>
+                    <version>${bouncycastle.version}</version>
+                    <outputDirectory>target/classes/lib/ext</outputDirectory>
+                    <destFileName>bcprov-jdk15on-${bouncycastle.version}.jar</destFileName>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>
index a644bf6ee835f0434ec6f4a1678dc48f3fcf9cf2..8a2aa59dfe59d2e9fb9ec9f0ab27abb6f89df11e 100644 (file)
@@ -37,6 +37,8 @@ netconf.tcp.client.port=8383
 netconf.ssh.address=0.0.0.0
 netconf.ssh.port=1830
 netconf.ssh.pk.path = ./configuration/RSA.pk
+# Set security provider to BouncyCastle
+org.apache.karaf.security.providers = org.bouncycastle.jce.provider.BouncyCastleProvider
 
 
 netconf.config.persister.active=1
index cdc592428f9d685ab0e715a6e9f762e5a743cc83..795f68c397d27265ab97997bd5b34b5d5b4b0c3f 100644 (file)
                   <outputDirectory>target/assembly/lib</outputDirectory>
                   <destFileName>karaf.branding-${branding.version}.jar</destFileName>
                 </artifactItem>
+                  <!-- Needs to be copied to lib/ext in order to start bouncy provider for mina sshd -->
+              <artifactItem>
+                  <groupId>org.bouncycastle</groupId>
+                  <artifactId>bcprov-jdk15on</artifactId>
+                  <version>${bouncycastle.version}</version>
+                  <outputDirectory>target/assembly/lib/ext</outputDirectory>
+                  <destFileName>bcprov-jdk15on-${bouncycastle.version}.jar</destFileName>
+              </artifactItem>
               </artifactItems>
             </configuration>
           </execution>
index 1e77a5554f179ee61da82e826d9cbf36269a94f5..3db929b99d9c68065cec7875412901090570530d 100644 (file)
@@ -47,7 +47,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
         final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
         this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
 
-        this.txChain =  dataBroker.createTransactionChain(this);
+        this.txChain = dataBroker.createTransactionChain(this);
         thread = new Thread(this);
         thread.setDaemon(true);
         thread.setName("FlowCapableInventoryProvider");
@@ -81,7 +81,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
             thread.join();
             thread = null;
         }
-        if(txChain != null) {
+        if (txChain != null) {
             txChain.close();
             txChain = null;
         }
@@ -137,8 +137,8 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
 
     @Override
     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
-            final Throwable cause) {
-        LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
+                                         final Throwable cause) {
+        LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
 
     }
 
index a15b1d746c5f5709374387f996056f6f979e2025..754d14f2c4cfc9d23791521e4ee30e6bc468d0ec 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadi
 import org.opendaylight.yangtools.sal.binding.generator.impl.RuntimeGeneratedMappingServiceImpl;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcService;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
@@ -38,8 +39,7 @@ import org.slf4j.LoggerFactory;
 /**
 *
 */
-public final class RuntimeMappingModule extends
-        org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractRuntimeMappingModule {
+public final class RuntimeMappingModule extends AbstractRuntimeMappingModule {
 
     private static final Logger LOG = LoggerFactory.getLogger(RuntimeMappingModule.class);
 
@@ -138,25 +138,24 @@ public final class RuntimeMappingModule extends
 
         @Override
         public Entry<YangInstanceIdentifier, CompositeNode> toDataDom(
-                final Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject> entry) {
+                final Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry) {
             return delegate.toDataDom(entry);
         }
 
         @Override
-        public YangInstanceIdentifier toDataDom(
-                final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject> path) {
+        public YangInstanceIdentifier toDataDom(final InstanceIdentifier<? extends DataObject> path) {
             return delegate.toDataDom(path);
         }
 
         @Override
         public DataObject dataObjectFromDataDom(
-                final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject> path,
+                final InstanceIdentifier<? extends DataObject> path,
                 final CompositeNode result) throws DeserializationException {
             return delegate.dataObjectFromDataDom(path, result);
         }
 
         @Override
-        public org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> fromDataDom(final YangInstanceIdentifier entry)
+        public InstanceIdentifier<?> fromDataDom(final YangInstanceIdentifier entry)
                 throws DeserializationException {
             return delegate.fromDataDom(entry);
         }
index 96a3f1cc3b7ead386165e1d530d6c2e7b0cccb15..eadde73e42c0daf632da84dc84fadfc08a171f92 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.controller.md.sal.binding.impl;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -15,15 +19,10 @@ import org.opendaylight.yangtools.concepts.Delegator;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-
-
 abstract class AbstractForwardedTransaction<T extends AsyncTransaction<YangInstanceIdentifier, NormalizedNode<?, ?>>>
         implements Delegator<T>, Identifiable<Object> {
 
@@ -56,9 +55,10 @@ abstract class AbstractForwardedTransaction<T extends AsyncTransaction<YangInsta
         return codec;
     }
 
-    protected final <T extends DataObject> CheckedFuture<Optional<T>,ReadFailedException> doRead(
+    protected final <D extends DataObject> CheckedFuture<Optional<D>,ReadFailedException> doRead(
             final DOMDataReadTransaction readTx, final LogicalDatastoreType store,
-            final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<T> path) {
+            final InstanceIdentifier<D> path) {
+        Preconditions.checkArgument(!path.isWildcarded(), "Invalid read of wildcarded path %s", path);
 
         return MappingCheckedFuture.create(
                     Futures.transform(readTx.read(store, codec.toNormalized(path)),
index 65a9c1abebd1b1a0373a2a32b742ceb0f94cfec3..4597f0646c030b311ac205613c8d3d4f33a05d39 100644 (file)
@@ -7,23 +7,21 @@
  */
 package org.opendaylight.controller.md.sal.binding.impl;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.CheckedFuture;
 import java.util.Collections;
 import java.util.Map.Entry;
-
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.Identifiable;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  *
@@ -33,34 +31,30 @@ import com.google.common.util.concurrent.CheckedFuture;
 public abstract class AbstractWriteTransaction<T extends DOMDataWriteTransaction> extends
         AbstractForwardedTransaction<T> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteTransaction.class);
-
-    protected AbstractWriteTransaction(final T delegate,
-            final BindingToNormalizedNodeCodec codec) {
+    protected AbstractWriteTransaction(final T delegate, final BindingToNormalizedNodeCodec codec) {
         super(delegate, codec);
     }
 
-
     public final <U extends DataObject> void put(final LogicalDatastoreType store,
             final InstanceIdentifier<U> path, final U data, final boolean createParents) {
-       final Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
-                .toNormalizedNode(path, data);
-        if(createParents) {
+        Preconditions.checkArgument(!path.isWildcarded(), "Cannot put data into wildcarded path %s", path);
+
+        final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec().toNormalizedNode(path, data);
+        if (createParents) {
             ensureParentsByMerge(store, normalized.getKey(), path);
         } else {
             ensureListParentIfNeeded(store,path,normalized);
         }
+
         getDelegate().put(store, normalized.getKey(), normalized.getValue());
     }
 
-
     public final <U extends DataObject> void merge(final LogicalDatastoreType store,
             final InstanceIdentifier<U> path, final U data,final boolean createParents) {
+        Preconditions.checkArgument(!path.isWildcarded(), "Cannot merge data into wildcarded path %s", path);
 
-        final Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
-                .toNormalizedNode(path, data);
-
-        if(createParents) {
+        final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec().toNormalizedNode(path, data);
+        if (createParents) {
             ensureParentsByMerge(store, normalized.getKey(), path);
         } else {
             ensureListParentIfNeeded(store,path,normalized);
@@ -69,7 +63,6 @@ public abstract class AbstractWriteTransaction<T extends DOMDataWriteTransaction
         getDelegate().merge(store, normalized.getKey(), normalized.getValue());
     }
 
-
     /**
      *
      * Ensures list parent if item is list, otherwise noop.
@@ -103,25 +96,24 @@ public abstract class AbstractWriteTransaction<T extends DOMDataWriteTransaction
      * @param normalized Normalized version of data to be written
      */
     private void ensureListParentIfNeeded(final LogicalDatastoreType store, final InstanceIdentifier<?> path,
-            final Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> normalized) {
-        if(Identifiable.class.isAssignableFrom(path.getTargetType())) {
-            org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier parentMapPath = getParent(normalized.getKey()).get();
+            final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalized) {
+        if (Identifiable.class.isAssignableFrom(path.getTargetType())) {
+            YangInstanceIdentifier parentMapPath = getParent(normalized.getKey()).get();
             NormalizedNode<?, ?> emptyParent = getCodec().getDefaultNodeFor(parentMapPath);
             getDelegate().merge(store, parentMapPath, emptyParent);
         }
-
     }
 
     // FIXME (should be probaly part of InstanceIdentifier)
-    protected static Optional<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier> getParent(
-            final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier child) {
+    protected static Optional<YangInstanceIdentifier> getParent(
+            final YangInstanceIdentifier child) {
 
         Iterable<PathArgument> mapEntryItemPath = child.getPathArguments();
         int parentPathSize = Iterables.size(mapEntryItemPath) - 1;
-        if(parentPathSize > 1) {
-            return Optional.of(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.create(Iterables.limit(mapEntryItemPath,  parentPathSize)));
+        if (parentPathSize > 1) {
+            return Optional.of(YangInstanceIdentifier.create(Iterables.limit(mapEntryItemPath,  parentPathSize)));
         } else if(parentPathSize == 0) {
-            return Optional.of(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.create(Collections.<PathArgument>emptyList()));
+            return Optional.of(YangInstanceIdentifier.create(Collections.<PathArgument>emptyList()));
         } else {
             return Optional.absent();
         }
@@ -136,11 +128,13 @@ public abstract class AbstractWriteTransaction<T extends DOMDataWriteTransaction
      * @param path
      */
     protected abstract void ensureParentsByMerge(LogicalDatastoreType store,
-            org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier key, InstanceIdentifier<?> path);
+            YangInstanceIdentifier key, InstanceIdentifier<?> path);
 
     protected final void doDelete(final LogicalDatastoreType store,
             final InstanceIdentifier<?> path) {
-        final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier normalized = getCodec().toNormalized(path);
+        Preconditions.checkArgument(!path.isWildcarded(), "Cannot delete wildcarded path %s", path);
+
+        final YangInstanceIdentifier normalized = getCodec().toNormalized(path);
         getDelegate().delete(store, normalized);
     }
 
index c390fe70491a9cfea76996e866008258b1979e26..c3f46b2a625226eebdfce228a7ee9f4ea16f856a 100644 (file)
@@ -7,10 +7,11 @@
  */
 package org.opendaylight.controller.sal.binding.impl;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
-
 import org.opendaylight.controller.md.sal.binding.util.AbstractBindingSalProviderInstance;
 import org.opendaylight.controller.sal.binding.api.mount.MountProviderInstance;
 import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
@@ -20,9 +21,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
 public class MountPointManagerImpl implements MountProviderService {
 
     public final Logger LOG = LoggerFactory.getLogger(MountPointManagerImpl.class);
@@ -113,7 +111,7 @@ public class MountPointManagerImpl implements MountProviderService {
 
         private final InstanceIdentifier<?> identifier;
 
-        public BindingMountPointImpl(final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> identifier,
+        public BindingMountPointImpl(final InstanceIdentifier<?> identifier,
                 final RpcProviderRegistryImpl rpcRegistry, final NotificationBrokerImpl notificationBroker,
                 final DataBrokerImpl dataBroker) {
             super(rpcRegistry, notificationBroker, dataBroker);
index af8a987c73315702b6e7db3910ab21d83fcd6f01..1021ddeee7a5348f26cbaa8845059a515c537f92 100644 (file)
@@ -9,13 +9,15 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+
 import scala.concurrent.duration.Duration;
 
 import java.util.concurrent.TimeUnit;
 
 /**
- * Contains contextual data for shards.
+ * Contains contextual data for a data store.
  *
  * @author Thomas Pantelis
  */
@@ -23,16 +25,24 @@ public class DatastoreContext {
 
     private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
     private final Duration shardTransactionIdleTimeout;
+    private final int operationTimeoutInSeconds;
+    private final String dataStoreMXBeanType;
 
     public DatastoreContext() {
         this.dataStoreProperties = null;
+        this.dataStoreMXBeanType = "DistributedDatastore";
         this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
+        this.operationTimeoutInSeconds = 5;
     }
 
-    public DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
-        Duration shardTransactionIdleTimeout) {
+    public DatastoreContext(String dataStoreMXBeanType,
+            InMemoryDOMDataStoreConfigProperties dataStoreProperties,
+            Duration shardTransactionIdleTimeout,
+            int operationTimeoutInSeconds) {
+        this.dataStoreMXBeanType = dataStoreMXBeanType;
         this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
-        this.shardTransactionIdleTimeout = Preconditions.checkNotNull(shardTransactionIdleTimeout);
+        this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
     }
 
     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
@@ -43,5 +53,11 @@ public class DatastoreContext {
         return shardTransactionIdleTimeout;
     }
 
+    public String getDataStoreMXBeanType() {
+        return dataStoreMXBeanType;
+    }
 
+    public int getOperationTimeoutInSeconds() {
+        return operationTimeoutInSeconds;
+    }
 }
index 0a137e07df43a1bb7ca2fb3e854d7f63adfd46a3..db01d515354a9d166e2b906d8cd7168e7c39deb0 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.concurrent.TimeUnit;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
@@ -22,7 +20,6 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -36,8 +33,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.duration.Duration;
-
 /**
  *
  */
@@ -46,42 +41,30 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
     private final ActorContext actorContext;
-    private final DatastoreContext datastoreContext;
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
-            Configuration configuration, DistributedDataStoreProperties dataStoreProperties) {
+            Configuration configuration, DatastoreContext datastoreContext) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
-
+        Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
 
         String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
-        datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.create(
-                dataStoreProperties.getMaxShardDataChangeExecutorPoolSize(),
-                dataStoreProperties.getMaxShardDataChangeExecutorQueueSize(),
-                dataStoreProperties.getMaxShardDataChangeListenerQueueSize()),
-                Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
-                        TimeUnit.MINUTES));
+        actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
+                ShardManager.props(type, cluster, configuration, datastoreContext)
+                    .withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
 
-        actorContext
-                = new ActorContext(
-                    actorSystem, actorSystem.actorOf(
-                        ShardManager.props(type, cluster, configuration, datastoreContext).
-                            withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
-
-        actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
+        actorContext.setOperationTimeout(datastoreContext.getOperationTimeoutInSeconds());
     }
 
     public DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
-        this.datastoreContext = new DatastoreContext();
     }
 
-
     @SuppressWarnings("unchecked")
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
index 72b593f0106676a28bd251837f2566600d9359f1..8739ed1966b618a3843c8e23e5671ea05eb48f15 100644 (file)
@@ -16,13 +16,13 @@ import org.osgi.framework.BundleContext;
 
 public class DistributedDataStoreFactory {
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
-            DistributedDataStoreProperties dataStoreProperties, BundleContext bundleContext) {
+            DatastoreContext datastoreContext, BundleContext bundleContext) {
 
         ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
             new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
-                    config, dataStoreProperties );
+                    config, datastoreContext );
         ShardStrategyFactory.setConfiguration(config);
         schemaService.registerSchemaContextListener(dataStore);
         return dataStore;
index df3245ffb225d9d3b0baf704e81e499ebdced314..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 (file)
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-/**
- * Wrapper class for DistributedDataStore configuration properties.
- *
- * @author Thomas Pantelis
- */
-public class DistributedDataStoreProperties {
-    private final int maxShardDataChangeListenerQueueSize;
-    private final int maxShardDataChangeExecutorQueueSize;
-    private final int maxShardDataChangeExecutorPoolSize;
-    private final int shardTransactionIdleTimeoutInMinutes;
-    private final int operationTimeoutInSeconds;
-
-    public DistributedDataStoreProperties() {
-        maxShardDataChangeListenerQueueSize = 1000;
-        maxShardDataChangeExecutorQueueSize = 1000;
-        maxShardDataChangeExecutorPoolSize = 20;
-        shardTransactionIdleTimeoutInMinutes = 10;
-        operationTimeoutInSeconds = 5;
-    }
-
-    public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
-            int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
-            int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
-        this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
-        this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
-        this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
-        this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
-        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
-    }
-
-    public int getMaxShardDataChangeListenerQueueSize() {
-        return maxShardDataChangeListenerQueueSize;
-    }
-
-    public int getMaxShardDataChangeExecutorQueueSize() {
-        return maxShardDataChangeExecutorQueueSize;
-    }
-
-    public int getMaxShardDataChangeExecutorPoolSize() {
-        return maxShardDataChangeExecutorPoolSize;
-    }
-
-    public int getShardTransactionIdleTimeoutInMinutes() {
-        return shardTransactionIdleTimeoutInMinutes;
-    }
-
-    public int getOperationTimeoutInSeconds() {
-        return operationTimeoutInSeconds;
-    }
-}
index 6a6a181b6c03ac744d02a3e8e815011d2cf99c3f..7d570046d406feec976620f9215398475711a756 100644 (file)
@@ -10,19 +10,22 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -35,30 +38,35 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionR
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -102,12 +110,15 @@ public class Shard extends RaftActor {
 
     private SchemaContext schemaContext;
 
+    private ActorRef createSnapshotTransaction;
+
     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-            DatastoreContext datastoreContext) {
+            DatastoreContext datastoreContext, SchemaContext schemaContext) {
         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
 
         this.name = name;
         this.datastoreContext = datastoreContext;
+        this.schemaContext = schemaContext;
 
         String setting = System.getProperty("shard.persistent");
 
@@ -118,8 +129,14 @@ public class Shard extends RaftActor {
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
 
-        shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
+        if(schemaContext != null) {
+            store.onGlobalContextUpdated(schemaContext);
+        }
 
+        shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
+                datastoreContext.getDataStoreMXBeanType());
+        shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
+        shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
 
     }
 
@@ -137,12 +154,13 @@ public class Shard extends RaftActor {
 
     public static Props props(final ShardIdentifier name,
         final Map<ShardIdentifier, String> peerAddresses,
-        DatastoreContext datastoreContext) {
+        DatastoreContext datastoreContext, SchemaContext schemaContext) {
         Preconditions.checkNotNull(name, "name should not be null");
         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
-        Preconditions.checkNotNull(datastoreContext, "shardContext should not be null");
+        Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
+        Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
 
-        return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
+        return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
     }
 
     @Override public void onReceiveRecover(Object message) {
@@ -167,6 +185,15 @@ public class Shard extends RaftActor {
             } else if (getLeader() != null) {
                 getLeader().forward(message, getContext());
             }
+        } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+            // This must be for install snapshot. Don't want to open this up and trigger
+            // deSerialization
+            self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self());
+
+            // Send a PoisonPill instead of sending close transaction because we do not really need
+            // a response
+            getSender().tell(PoisonPill.getInstance(), self());
+
         } else if (message instanceof RegisterChangeListener) {
             registerChangeListener((RegisterChangeListener) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -190,60 +217,79 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTypedTransactionActor(
-        CreateTransaction createTransaction,
+        int transactionType,
         ShardTransactionIdentifier transactionId) {
-        if (createTransaction.getTransactionType()
+
+        if(this.schemaContext == null){
+            throw new NullPointerException("schemaContext should not be null");
+        }
+
+        if (transactionType
             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
 
             shardMBean.incrementReadOnlyTransactionCount();
 
             return getContext().actorOf(
                 ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
-                        schemaContext,datastoreContext, name.toString()), transactionId.toString());
+                        schemaContext,datastoreContext, shardMBean), transactionId.toString());
 
-        } else if (createTransaction.getTransactionType()
+        } else if (transactionType
             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 
             shardMBean.incrementReadWriteTransactionCount();
 
             return getContext().actorOf(
                 ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
-                        schemaContext, datastoreContext,name.toString()), transactionId.toString());
+                        schemaContext, datastoreContext, shardMBean), transactionId.toString());
 
 
-        } else if (createTransaction.getTransactionType()
+        } else if (transactionType
             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
 
             shardMBean.incrementWriteOnlyTransactionCount();
 
             return getContext().actorOf(
                 ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
-                        schemaContext, datastoreContext, name.toString()), transactionId.toString());
+                        schemaContext, datastoreContext, shardMBean), transactionId.toString());
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
-                    + createTransaction.getTransactionType());
+                    + transactionType);
         }
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
+        createTransaction(createTransaction.getTransactionType(),
+            createTransaction.getTransactionId());
+    }
+
+    private ActorRef createTransaction(int transactionType, String remoteTransactionId) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
-                .remoteTransactionId(createTransaction.getTransactionId())
+                .remoteTransactionId(remoteTransactionId)
                 .build();
         LOG.debug("Creating transaction : {} ", transactionId);
         ActorRef transactionActor =
-            createTypedTransactionActor(createTransaction, transactionId);
+            createTypedTransactionActor(transactionType, transactionId);
 
         getSender()
             .tell(new CreateTransactionReply(
                     Serialization.serializedActorPath(transactionActor),
-                    createTransaction.getTransactionId()).toSerializable(),
-                getSelf()
-            );
+                    remoteTransactionId).toSerializable(),
+                getSelf());
+
+        return transactionActor;
     }
 
+    private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
+        throws ExecutionException, InterruptedException {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        commitCohort.preCommit().get();
+        commitCohort.commit().get();
+    }
+
+
     private void commit(final ActorRef sender, Object serialized) {
         Modification modification = MutableCompositeModification
             .fromSerializable(serialized, schemaContext);
@@ -253,16 +299,11 @@ public class Shard extends RaftActor {
             LOG.debug(
                 "Could not find cohort for modification : {}. Writing modification using a new transaction",
                 modification);
-            DOMStoreReadWriteTransaction transaction =
-                store.newReadWriteTransaction();
+            DOMStoreWriteTransaction transaction =
+                store.newWriteOnlyTransaction();
             modification.apply(transaction);
-            DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-            ListenableFuture<Void> future =
-                commitCohort.preCommit();
             try {
-                future.get();
-                future = commitCohort.commit();
-                future.get();
+                syncCommitTransaction(transaction);
             } catch (InterruptedException | ExecutionException e) {
                 shardMBean.incrementFailedTransactionsCount();
                 LOG.error("Failed to commit", e);
@@ -281,7 +322,7 @@ public class Shard extends RaftActor {
             public void onSuccess(Void v) {
                 sender.tell(new CommitTransactionReply().toSerializable(), self);
                 shardMBean.incrementCommittedTransactionCount();
-                shardMBean.setLastCommittedTransactionTime(new Date());
+                shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
             }
 
             @Override
@@ -311,9 +352,14 @@ public class Shard extends RaftActor {
 
     private void updateSchemaContext(UpdateSchemaContext message) {
         this.schemaContext = message.getSchemaContext();
+        updateSchemaContext(message.getSchemaContext());
         store.onGlobalContextUpdated(message.getSchemaContext());
     }
 
+    @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
+        store.onGlobalContextUpdated(schemaContext);
+    }
+
     private void registerChangeListener(
         RegisterChangeListener registerChangeListener) {
 
@@ -358,9 +404,9 @@ public class Shard extends RaftActor {
     private void createTransactionChain() {
         DOMStoreTransactionChain chain = store.createTransactionChain();
         ActorRef transactionChain = getContext().actorOf(
-                ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
+                ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
-                getSelf());
+            getSelf());
     }
 
     @Override protected void applyState(ActorRef clientActor, String identifier,
@@ -396,11 +442,39 @@ public class Shard extends RaftActor {
     }
 
     @Override protected void createSnapshot() {
-        throw new UnsupportedOperationException("createSnapshot");
+        if (createSnapshotTransaction == null) {
+
+            // Create a transaction. We are really going to treat the transaction as a worker
+            // so that this actor does not get block building the snapshot
+            createSnapshotTransaction = createTransaction(
+                TransactionProxy.TransactionType.READ_ONLY.ordinal(),
+                "createSnapshot");
+
+            createSnapshotTransaction.tell(
+                new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
+
+        }
     }
 
-    @Override protected void applySnapshot(ByteString snapshot) {
-        throw new UnsupportedOperationException("applySnapshot");
+    @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
+        // Since this will be done only on Recovery or when this actor is a Follower
+        // we can safely commit everything in here. We not need to worry about event notifications
+        // as they would have already been disabled on the follower
+        try {
+            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+            NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
+            NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
+                .decode(YangInstanceIdentifier.builder().build(), serializedNode);
+
+            // delete everything first
+            transaction.delete(YangInstanceIdentifier.builder().build());
+
+            // Add everything from the remote node back
+            transaction.write(YangInstanceIdentifier.builder().build(), node);
+            syncCommitTransaction(transaction);
+        } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
+            LOG.error(e, "An exception occurred when applying snapshot");
+        }
     }
 
     @Override protected void onStateChanged() {
@@ -438,17 +512,42 @@ public class Shard extends RaftActor {
         final ShardIdentifier name;
         final Map<ShardIdentifier, String> peerAddresses;
         final DatastoreContext datastoreContext;
+        final SchemaContext schemaContext;
 
         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-                DatastoreContext datastoreContext) {
+                DatastoreContext datastoreContext, SchemaContext schemaContext) {
             this.name = name;
             this.peerAddresses = peerAddresses;
             this.datastoreContext = datastoreContext;
+            this.schemaContext = schemaContext;
         }
 
         @Override
         public Shard create() throws Exception {
-            return new Shard(name, peerAddresses, datastoreContext);
+            return new Shard(name, peerAddresses, datastoreContext, schemaContext);
         }
     }
+
+    @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
+        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
+
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
+            transaction.read(YangInstanceIdentifier.builder().build());
+
+        NormalizedNode<?, ?> node = future.get().get();
+
+        transaction.close();
+
+        return node;
+    }
+
+    @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
+        throws ExecutionException, InterruptedException {
+        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+
+        transaction.write(id, node);
+
+        syncCommitTransaction(transaction);
+    }
+
 }
index e51d49bff2aff8b6380081e0e772765d172246b0..58cdefe5371d2b58be6e7c9f5e461734f34acd07 100644 (file)
@@ -17,9 +17,7 @@ import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
-
 import com.google.common.base.Preconditions;
-
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -32,8 +30,8 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
 import java.util.ArrayList;
@@ -89,9 +87,7 @@ public class ShardManager extends AbstractUntypedActor {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        // Create all the local Shards and make them a child of the ShardManager
-        // TODO: This may need to be initiated when we first get the schema context
-        createLocalShards();
+        //createLocalShards(null);
     }
 
     public static Props props(final String type,
@@ -162,8 +158,14 @@ public class ShardManager extends AbstractUntypedActor {
      * @param message
      */
     private void updateSchemaContext(Object message) {
-        for(ShardInformation info : localShards.values()){
-            info.getActor().tell(message,getSelf());
+        SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+        if(localShards.size() == 0){
+            createLocalShards(schemaContext);
+        } else {
+            for (ShardInformation info : localShards.values()) {
+                info.getActor().tell(message, getSelf());
+            }
         }
     }
 
@@ -235,7 +237,7 @@ public class ShardManager extends AbstractUntypedActor {
      * runs
      *
      */
-    private void createLocalShards() {
+    private void createLocalShards(SchemaContext schemaContext) {
         String memberName = this.cluster.getCurrentMemberName();
         List<String> memberShardNames =
             this.configuration.getMemberShardNames(memberName);
@@ -245,16 +247,14 @@ public class ShardManager extends AbstractUntypedActor {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext).
+                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext).
                     withMailbox(ActorContext.MAILBOX), shardId.toString());
-
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
 
-        mBean = ShardManagerInfo
-            .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
-
+        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+                    datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
     }
 
     /**
index 91d629432f41f7633d128dd6c17e03a283c6bc18..0e9fd113c53108537c4613569e25cce1805c8877 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@ -26,8 +27,8 @@ public class ShardReadTransaction extends ShardTransaction {
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,String shardName) {
-        super(shardActor, schemaContext, shardName);
+            SchemaContext schemaContext, ShardStats shardStats) {
+        super(shardActor, schemaContext, shardStats);
         this.transaction = transaction;
     }
 
index bd71c27fd67d8b2d2b9d99808a82f91f878d35fc..d04ec233eaf33ae3745eee9fabfd72080f469240 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
@@ -30,8 +31,8 @@ public class ShardReadWriteTransaction extends ShardTransaction {
     private final DOMStoreReadWriteTransaction transaction;
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,String shardName) {
-        super(shardActor, schemaContext, shardName);
+            SchemaContext schemaContext, ShardStats shardStats) {
+        super(shardActor, schemaContext, shardStats);
         this.transaction = transaction;
     }
 
index 3b0e0934d9b67c6a18fde38edeb27ae7e92e52f4..65f865b0c43ecdd6da13754605bccdc91a6f472e 100644 (file)
@@ -13,10 +13,12 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.ReceiveTimeout;
 import akka.japi.Creator;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -73,22 +75,21 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
 
     private final ActorRef shardActor;
     protected final SchemaContext schemaContext;
-    private final String  shardName;
-
+    private final ShardStats shardStats;
 
     private final MutableCompositeModification modification = new MutableCompositeModification();
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-        String shardName) {
+            ShardStats shardStats) {
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
-        this.shardName = shardName;
+        this.shardStats = shardStats;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,DatastoreContext datastoreContext, String shardName) {
+            SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats) {
         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
-           datastoreContext, shardName));
+           datastoreContext, shardStats));
     }
 
     protected abstract DOMStoreTransaction getDOMStoreTransaction();
@@ -137,7 +138,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
                         sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
                     }
                 } catch (Exception e) {
-                    ShardMBeanFactory.getShardStatsMBean(shardName).incrementFailedReadTransactionsCount();
+                    shardStats.incrementFailedReadTransactionsCount();
                     sender.tell(new akka.actor.Status.Failure(e), self);
                 }
 
@@ -196,7 +197,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
     protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
         ActorRef cohortActor = getContext().actorOf(
-            ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardName), "cohort");
+            ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardStats), "cohort");
         getSender()
         .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
 
@@ -210,13 +211,14 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
         final ActorRef shardActor;
         final SchemaContext schemaContext;
         final DatastoreContext datastoreContext;
-        final String shardName;
+        final ShardStats shardStats;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
-                SchemaContext schemaContext, DatastoreContext datastoreContext, String shardName) {
+                SchemaContext schemaContext, DatastoreContext datastoreContext,
+                ShardStats shardStats) {
             this.transaction = transaction;
             this.shardActor = shardActor;
-            this.shardName = shardName;
+            this.shardStats = shardStats;
             this.schemaContext = schemaContext;
             this.datastoreContext = datastoreContext;
         }
@@ -226,13 +228,13 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
             ShardTransaction tx;
             if(transaction instanceof DOMStoreReadWriteTransaction) {
                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, schemaContext, shardName);
+                        shardActor, schemaContext, shardStats);
             } else if(transaction instanceof DOMStoreReadTransaction) {
                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        schemaContext, shardName);
+                        schemaContext, shardStats);
             } else {
                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, schemaContext, shardName);
+                        shardActor, schemaContext, shardStats);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index e7a181865e70c54917a272a6268704316111d13c..484bd54a0743616ebb3fdb3bd95f0c1c253b1996 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -27,14 +28,14 @@ public class ShardTransactionChain extends AbstractUntypedActor {
     private final DOMStoreTransactionChain chain;
     private final DatastoreContext datastoreContext;
     private final SchemaContext schemaContext;
-    private final String shardName;
+    private final ShardStats shardStats;
 
     public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext,String shardName) {
+            DatastoreContext datastoreContext, ShardStats shardStats) {
         this.chain = chain;
         this.datastoreContext = datastoreContext;
         this.schemaContext = schemaContext;
-        this.shardName = shardName;
+        this.shardStats = shardStats;
     }
 
     @Override
@@ -60,17 +61,17 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                 TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext,shardName), transactionId);
+                            schemaContext, datastoreContext, shardStats), transactionId);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
-                            schemaContext, datastoreContext,shardName), transactionId);
+                            schemaContext, datastoreContext, shardStats), transactionId);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext,shardName), transactionId);
+                            schemaContext, datastoreContext, shardStats), transactionId);
         } else {
             throw new IllegalArgumentException (
                     "CreateTransaction message has unidentified transaction type=" +
@@ -87,8 +88,9 @@ public class ShardTransactionChain extends AbstractUntypedActor {
     }
 
     public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-        DatastoreContext datastoreContext, String shardName) {
-        return Props.create(new ShardTransactionChainCreator(chain, schemaContext, datastoreContext, shardName));
+        DatastoreContext datastoreContext, ShardStats shardStats) {
+        return Props.create(new ShardTransactionChainCreator(chain, schemaContext,
+                datastoreContext, shardStats));
     }
 
     private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
@@ -97,20 +99,20 @@ public class ShardTransactionChain extends AbstractUntypedActor {
         final DOMStoreTransactionChain chain;
         final DatastoreContext datastoreContext;
         final SchemaContext schemaContext;
-        final String shardName;
+        final ShardStats shardStats;
 
 
         ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext, String shardName) {
+            DatastoreContext datastoreContext, ShardStats shardStats) {
             this.chain = chain;
             this.datastoreContext = datastoreContext;
             this.schemaContext = schemaContext;
-            this.shardName = shardName;
+            this.shardStats = shardStats;
         }
 
         @Override
         public ShardTransactionChain create() throws Exception {
-            return new ShardTransactionChain(chain, schemaContext, datastoreContext,shardName);
+            return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats);
         }
     }
 }
index 41c46c3375c58c6177f91b4e397daafaf426b064..396b27a0423c95bdee59e6678c7c12f8796b6c36 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
@@ -28,8 +29,8 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,String shardName) {
-        super(shardActor, schemaContext, shardName);
+            SchemaContext schemaContext, ShardStats shardStats) {
+        super(shardActor, schemaContext, shardStats);
         this.transaction = transaction;
     }
 
index 5a6d0eca5c2a159963febc4ee9d6436f2e864a5d..2dce6a1079c4fdbb0a8e2fa090fa018908d3f5ce 100644 (file)
@@ -19,7 +19,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -35,25 +35,25 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final ActorRef shardActor;
     private final CompositeModification modification;
-    private final String shardName;
+    private final ShardStats shardStats;
 
     public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
-        ActorRef shardActor, CompositeModification modification,String shardName) {
+        ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
 
         this.cohort = cohort;
         this.shardActor = shardActor;
         this.modification = modification;
-        this.shardName = shardName;
+        this.shardStats = shardStats;
     }
 
     private final LoggingAdapter log =
         Logging.getLogger(getContext().system(), this);
 
     public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
-        final ActorRef shardActor, final CompositeModification modification,
-        String shardName) {
+            final ActorRef shardActor, final CompositeModification modification,
+            ShardStats shardStats) {
         return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification,
-           shardName));
+                shardStats));
     }
 
     @Override
@@ -83,7 +83,7 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void v) {
-                ShardMBeanFactory.getShardStatsMBean(shardName).incrementAbortTransactionsCount();
+                shardStats.incrementAbortTransactionsCount();
                 sender
                     .tell(new AbortTransactionReply().toSerializable(),
                     self);
@@ -154,19 +154,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final DOMStoreThreePhaseCommitCohort cohort;
         final ActorRef shardActor;
         final CompositeModification modification;
-        final String shardName;
+        final ShardStats shardStats;
 
         ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
-            ActorRef shardActor, CompositeModification modification, String shardName) {
+            ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
             this.cohort = cohort;
             this.shardActor = shardActor;
             this.modification = modification;
-            this.shardName = shardName;
+            this.shardStats = shardStats;
         }
 
         @Override
         public ThreePhaseCommitCohort create() throws Exception {
-            return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardName);
+            return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardStats);
         }
     }
 }
index 3c46935d9839a3b4ad634ee449319146c58d8583..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 (file)
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-
-package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
-
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-
-/**
- * All MBeans should extend this class that help in registering and
- * unregistering the MBeans.
- * @author Basheeruddin <syedbahm@cisco.com>
- */
-
-
-public abstract class AbstractBaseMBean {
-
-
-  public static String BASE_JMX_PREFIX = "org.opendaylight.controller:";
-  public static String JMX_TYPE_DISTRIBUTED_DATASTORE = "DistributedDatastore";
-  public static String JMX_CATEGORY_SHARD = "Shard";
-  public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(AbstractBaseMBean.class);
-
-  MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-  /**
-   * gets the MBean ObjectName
-   *
-   * @return Object name of the MBean
-   * @throws MalformedObjectNameException - The bean name does not have the right format.
-   * @throws NullPointerException - The bean name is null
-   */
-  protected ObjectName getMBeanObjectName()
-      throws MalformedObjectNameException, NullPointerException {
-    String name = BASE_JMX_PREFIX + "type="+getMBeanType()+",Category="+
-                                   getMBeanCategory() + ",name="+
-                                   getMBeanName();
-
-
-    return new ObjectName(name);
-  }
-
-  public boolean registerMBean() {
-    boolean registered = false;
-    try {
-      // Object to identify MBean
-      final ObjectName mbeanName = this.getMBeanObjectName();
-
-      Preconditions.checkArgument(mbeanName != null,
-          "Object name of the MBean cannot be null");
-
-      LOG.debug("Register MBean {}", mbeanName);
-
-      // unregistered if already registered
-      if (server.isRegistered(mbeanName)) {
-
-        LOG.debug("MBean {} found to be already registered", mbeanName);
-
-        try {
-          unregisterMBean(mbeanName);
-        } catch (Exception e) {
-
-          LOG.warn("unregister mbean {} resulted in exception {} ", mbeanName,
-              e);
-        }
-      }
-      server.registerMBean(this, mbeanName);
-
-      LOG.debug("MBean {} registered successfully",
-          mbeanName.getCanonicalName());
-      registered = true;
-    } catch (Exception e) {
-
-      LOG.error("registration failed {}", e);
-
-    }
-    return registered;
-  }
-
-
-  public boolean unregisterMBean() {
-    boolean unregister = false;
-    try {
-      ObjectName mbeanName = this.getMBeanObjectName();
-      unregister = true;
-      unregisterMBean(mbeanName);
-    } catch (Exception e) {
-
-      LOG.error("Failed when unregistering MBean {}", e);
-    }
-    return unregister;
-  }
-
-  private void unregisterMBean(ObjectName mbeanName)
-      throws MBeanRegistrationException, InstanceNotFoundException {
-
-    server.unregisterMBean(mbeanName);
-
-  }
-
-
-  /**
-   * @return name of bean
-   */
-  protected abstract String getMBeanName();
-
-  /**
-   * @return type of the MBean
-   */
-  protected abstract String getMBeanType();
-
-
-  /**
-   * @return Category name of teh bean
-   */
-  protected abstract String getMBeanCategory();
-
-  //require for test cases
-  public MBeanServer getMBeanServer() {
-    return server;
-  }
-}
index 2a409c0300889cb34586524fa12d2381b15f4a8b..946e525a6d0019418fd00e310064b0f78487485a 100644 (file)
@@ -7,28 +7,41 @@
  */
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 
 /**
  * @author Basheeruddin syedbahm@cisco.com
  *
  */
 public class ShardMBeanFactory {
-    private static Map<String, ShardStats> shardMBeans =
-        new HashMap<String, ShardStats>();
 
-    public static ShardStats getShardStatsMBean(String shardName) {
-        if (shardMBeans.containsKey(shardName)) {
-            return shardMBeans.get(shardName);
-        } else {
-            ShardStats shardStatsMBeanImpl = new ShardStats(shardName);
+    private static final Logger LOG = LoggerFactory.getLogger(ShardMBeanFactory.class);
 
-            if (shardStatsMBeanImpl.registerMBean()) {
-                shardMBeans.put(shardName, shardStatsMBeanImpl);
-            }
-            return shardStatsMBeanImpl;
+    private static Cache<String,ShardStats> shardMBeansCache =
+                                      CacheBuilder.newBuilder().weakValues().build();
+
+    public static ShardStats getShardStatsMBean(final String shardName, final String mxBeanType) {
+        final String finalMXBeanType = mxBeanType != null ? mxBeanType : "DistDataStore";
+        try {
+            return shardMBeansCache.get(shardName, new Callable<ShardStats>() {
+                @Override
+                public ShardStats call() throws Exception {
+                    ShardStats shardStatsMBeanImpl = new ShardStats(shardName, finalMXBeanType);
+                    shardStatsMBeanImpl.registerMBean();
+                    return shardStatsMBeanImpl;
+                }
+            });
+        } catch(ExecutionException e) {
+            LOG.error(String.format("Could not create MXBean for shard: %s", shardName), e);
+            // Just return an instance that isn't registered.
+            return new ShardStats(shardName, finalMXBeanType);
         }
     }
-
 }
index 22ad8e7f5a3408ff7457876635ced85f19ad2842..0a1964b0533bfc7ead91025e5792f5edda85b844 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
 
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
 /**
+ * Maintains statistics for a shard.
+ *
  * @author  Basheeruddin syedbahm@cisco.com
  */
-public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
+public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
+    public static String JMX_CATEGORY_SHARD = "Shards";
 
-    private final String shardName;
+    private final AtomicLong committedTransactionsCount = new AtomicLong();
 
-    private long committedTransactionsCount = 0L;
+    private final AtomicLong readOnlyTransactionCount = new AtomicLong();
 
-    private long readOnlyTransactionCount = 0L;
+    private final AtomicLong writeOnlyTransactionCount = new AtomicLong();
 
-    private long writeOnlyTransactionCount = 0L;
-
-    private long readWriteTransactionCount = 0L;
+    private final AtomicLong readWriteTransactionCount = new AtomicLong();
 
     private String leader;
 
     private String raftState;
 
-    private long lastLogTerm = -1L;
+    private volatile long lastLogTerm = -1L;
+
+    private volatile long lastLogIndex = -1L;
 
-    private long lastLogIndex = -1L;
+    private volatile long currentTerm = -1L;
 
-    private long currentTerm = -1L;
+    private volatile long commitIndex = -1L;
 
-    private long commitIndex = -1L;
+    private volatile long lastApplied = -1L;
 
-    private long lastApplied = -1L;
+    private volatile long lastCommittedTransactionTime;
 
-    private Date lastCommittedTransactionTime = new Date(0L);
+    private final AtomicLong failedTransactionsCount = new AtomicLong();
 
-    private long failedTransactionsCount = 0L;
+    private final AtomicLong failedReadTransactionsCount = new AtomicLong();
 
-    private long failedReadTransactionsCount = 0L;
+    private final AtomicLong abortTransactionsCount = new AtomicLong();
 
-    private long abortTransactionsCount = 0L;
+    private ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
 
-    private SimpleDateFormat sdf =
+    private ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
+
+    private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
+
+    private final SimpleDateFormat sdf =
         new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
-    ShardStats(String shardName) {
-        this.shardName = shardName;
+    public ShardStats(String shardName, String mxBeanType) {
+        super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
+    }
+
+    public void setDataStoreExecutor(ExecutorService dsExecutor) {
+        this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dsExecutor,
+                "notification-executor", getMBeanType(), getMBeanCategory());
     }
 
+    public void setNotificationManager(QueuedNotificationManager<?, ?> manager) {
+        this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
+                "notification-manager", getMBeanType(), getMBeanCategory());
+
+        this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+                "data-store-executor", getMBeanType(), getMBeanCategory());
+    }
 
     @Override
     public String getShardName() {
-        return shardName;
+        return getMBeanName();
     }
 
     @Override
     public long getCommittedTransactionsCount() {
-        return committedTransactionsCount;
+        return committedTransactionsCount.get();
     }
 
-    @Override public String getLeader() {
+    @Override
+    public String getLeader() {
         return leader;
     }
 
-    @Override public String getRaftState() {
+    @Override
+    public String getRaftState() {
         return raftState;
     }
 
-    @Override public long getReadOnlyTransactionCount() {
-        return readOnlyTransactionCount;
+    @Override
+    public long getReadOnlyTransactionCount() {
+        return readOnlyTransactionCount.get();
     }
 
-    @Override public long getWriteOnlyTransactionCount() {
-        return writeOnlyTransactionCount;
+    @Override
+    public long getWriteOnlyTransactionCount() {
+        return writeOnlyTransactionCount.get();
     }
 
-    @Override public long getReadWriteTransactionCount() {
-        return readWriteTransactionCount;
+    @Override
+    public long getReadWriteTransactionCount() {
+        return readWriteTransactionCount.get();
     }
 
-    @Override public long getLastLogIndex() {
+    @Override
+    public long getLastLogIndex() {
         return lastLogIndex;
     }
 
-    @Override public long getLastLogTerm() {
+    @Override
+    public long getLastLogTerm() {
         return lastLogTerm;
     }
 
-    @Override public long getCurrentTerm() {
+    @Override
+    public long getCurrentTerm() {
         return currentTerm;
     }
 
-    @Override public long getCommitIndex() {
+    @Override
+    public long getCommitIndex() {
         return commitIndex;
     }
 
-    @Override public long getLastApplied() {
+    @Override
+    public long getLastApplied() {
         return lastApplied;
     }
 
     @Override
     public String getLastCommittedTransactionTime() {
 
-        return sdf.format(lastCommittedTransactionTime);
+        return sdf.format(new Date(lastCommittedTransactionTime));
     }
 
-    @Override public long getFailedTransactionsCount() {
-        return failedTransactionsCount;
+    @Override
+    public long getFailedTransactionsCount() {
+        return failedTransactionsCount.get();
     }
 
-    @Override public long getFailedReadTransactionsCount() {
-        return failedReadTransactionsCount;
+    @Override
+    public long getFailedReadTransactionsCount() {
+        return failedReadTransactionsCount.get();
     }
 
-    @Override public long getAbortTransactionsCount() {
-        return abortTransactionsCount;
+    @Override
+    public long getAbortTransactionsCount() {
+        return abortTransactionsCount.get();
     }
 
     public long incrementCommittedTransactionCount() {
-        return committedTransactionsCount++;
+        return committedTransactionsCount.incrementAndGet();
     }
 
     public long incrementReadOnlyTransactionCount() {
-        return readOnlyTransactionCount++;
+        return readOnlyTransactionCount.incrementAndGet();
     }
 
     public long incrementWriteOnlyTransactionCount() {
-        return writeOnlyTransactionCount++;
+        return writeOnlyTransactionCount.incrementAndGet();
     }
 
     public long incrementReadWriteTransactionCount() {
-        return readWriteTransactionCount++;
+        return readWriteTransactionCount.incrementAndGet();
     }
 
     public long incrementFailedTransactionsCount() {
-        return failedTransactionsCount++;
+        return failedTransactionsCount.incrementAndGet();
     }
 
     public long incrementFailedReadTransactionsCount() {
-        return failedReadTransactionsCount++;
+        return failedReadTransactionsCount.incrementAndGet();
     }
 
-    public long incrementAbortTransactionsCount () { return abortTransactionsCount++;}
+    public long incrementAbortTransactionsCount ()
+    {
+        return abortTransactionsCount.incrementAndGet();
+    }
 
     public void setLeader(String leader) {
         this.leader = leader;
@@ -180,49 +224,50 @@ public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
         this.lastApplied = lastApplied;
     }
 
-
-    public void setLastCommittedTransactionTime(
-        Date lastCommittedTransactionTime) {
+    public void setLastCommittedTransactionTime(long lastCommittedTransactionTime) {
         this.lastCommittedTransactionTime = lastCommittedTransactionTime;
     }
 
     @Override
-    protected String getMBeanName() {
-        return shardName;
+    public ThreadExecutorStats getDataStoreExecutorStats() {
+        return dataStoreExecutorStatsBean.toThreadExecutorStats();
+    }
+
+    @Override
+    public ThreadExecutorStats getNotificationMgrExecutorStats() {
+        return notificationExecutorStatsBean.toThreadExecutorStats();
     }
 
     @Override
-    protected String getMBeanType() {
-        return JMX_TYPE_DISTRIBUTED_DATASTORE;
+    public List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats() {
+        return notificationManagerStatsBean.getCurrentListenerQueueStats();
     }
 
     @Override
-    protected String getMBeanCategory() {
-        return JMX_CATEGORY_SHARD;
+    public int getMaxNotificationMgrListenerQueueSize() {
+        return notificationManagerStatsBean.getMaxListenerQueueSize();
     }
 
     /**
      * resets the counters related to transactions
      */
-
+    @Override
     public void resetTransactionCounters(){
-        committedTransactionsCount = 0L;
+        committedTransactionsCount.set(0);
 
-        readOnlyTransactionCount = 0L;
+        readOnlyTransactionCount.set(0);
 
-        writeOnlyTransactionCount = 0L;
+        writeOnlyTransactionCount.set(0);
 
-        readWriteTransactionCount = 0L;
+        readWriteTransactionCount.set(0);
 
-        lastCommittedTransactionTime = new Date(0L);
+        lastCommittedTransactionTime = 0;
 
-        failedTransactionsCount = 0L;
+        failedTransactionsCount.set(0);
 
-        failedReadTransactionsCount = 0L;
+        failedReadTransactionsCount.set(0);
 
-        abortTransactionsCount = 0L;
+        abortTransactionsCount.set(0);
 
     }
-
-
 }
index c16f8421bfb3f7ea8e75ab9bd366859693ff98a8..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 (file)
@@ -1,41 +0,0 @@
-package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
-
-/**
- * @author: syedbahm
- */
-public interface ShardStatsMBean {
-    String getShardName();
-
-    long getCommittedTransactionsCount();
-
-    String getLeader();
-
-    String getRaftState();
-
-    long getReadOnlyTransactionCount();
-
-    long getWriteOnlyTransactionCount();
-
-    long getReadWriteTransactionCount();
-
-    long getLastLogIndex();
-
-    long getLastLogTerm();
-
-    long getCurrentTerm();
-
-    long getCommitIndex();
-
-    long getLastApplied();
-
-    String getLastCommittedTransactionTime();
-
-    long getFailedTransactionsCount();
-
-    long getFailedReadTransactionsCount();
-
-    long getAbortTransactionsCount();
-
-    void resetTransactionCounters();
-
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
new file mode 100644 (file)
index 0000000..8deb0ae
--- /dev/null
@@ -0,0 +1,54 @@
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+
+/**
+ * @author: syedbahm
+ */
+public interface ShardStatsMXBean {
+
+   String getShardName();
+
+   long getCommittedTransactionsCount();
+
+   long getReadOnlyTransactionCount();
+
+   long getWriteOnlyTransactionCount();
+
+   long getReadWriteTransactionCount();
+
+   long getLastLogIndex();
+
+   long getLastLogTerm();
+
+   long getCurrentTerm();
+
+   long getCommitIndex();
+
+   long getLastApplied();
+
+   String getLastCommittedTransactionTime();
+
+   long getFailedTransactionsCount();
+
+   long getAbortTransactionsCount();
+
+   long getFailedReadTransactionsCount();
+
+   String getLeader();
+
+   String getRaftState();
+
+   ThreadExecutorStats getDataStoreExecutorStats();
+
+   ThreadExecutorStats getNotificationMgrExecutorStats();
+
+   List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats();
+
+   int getMaxNotificationMgrListenerQueueSize();
+
+   void resetTransactionCounters();
+}
index 0c609b459e19cee48bca50fea2a04db88101838f..99c8daf87d30af3ce66bf3b5c42aa86133ec5575 100644 (file)
@@ -8,44 +8,32 @@
 
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
 
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
-
 import java.util.List;
 
-public class ShardManagerInfo extends AbstractBaseMBean implements
-    ShardManagerInfoMBean {
-
-    private final String name;
-    private final List<String> localShards;
-
-    public ShardManagerInfo(String name, List<String> localShards) {
-        this.name = name;
-        this.localShards = localShards;
-    }
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 
+public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean {
 
-    @Override protected String getMBeanName() {
-        return name;
-    }
+    public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
 
-    @Override protected String getMBeanType() {
-        return JMX_TYPE_DISTRIBUTED_DATASTORE;
-    }
+    private final List<String> localShards;
 
-    @Override protected String getMBeanCategory() {
-        return JMX_CATEGORY_SHARD_MANAGER;
+    public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
+        super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
+        this.localShards = localShards;
     }
 
-    public static ShardManagerInfo createShardManagerMBean(String name, List<String> localShards){
-        ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name,
-            localShards);
+    public static ShardManagerInfo createShardManagerMBean(String name, String mxBeanType,
+            List<String> localShards){
+        ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name, mxBeanType, localShards);
 
         shardManagerInfo.registerMBean();
 
         return shardManagerInfo;
     }
 
-    @Override public List<String> getLocalShards() {
+    @Override
+    public List<String> getLocalShards() {
         return localShards;
     }
 }
index c5498ca228b3f846b5fd528b2da4dd2a4afef7a2..fc6bcff64a3c1afa691abb54650434c9f6af4886 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -46,4 +47,9 @@ public class ReadDataReply implements SerializableMessage{
     ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
     return new ReadDataReply(schemaContext,new NormalizedNodeToNodeCodec(schemaContext).decode(id, o.getNormalizedNode()));
   }
+
+  public static ByteString getNormalizedNodeByteString(Object serializable){
+      ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
+      return ((ShardTransactionMessages.ReadDataReply) serializable).getNormalizedNode().toByteString();
+  }
 }
index e2fbacb46169047764d24f8ba09f087693773d84..e7a7aab406677c53713e33b92d04f0c3a03f29aa 100644 (file)
@@ -1,9 +1,14 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.osgi.framework.BundleContext;
 
+import scala.concurrent.duration.Duration;
+
 public class DistributedConfigDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
     private BundleContext bundleContext;
@@ -35,13 +40,18 @@ public class DistributedConfigDataStoreProviderModule extends
             props = new ConfigProperties();
         }
 
-        return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
-                new DistributedDataStoreProperties(
+        DatastoreContext datastoreContext = new DatastoreContext("DistributedConfigDatastore",
+                InMemoryDOMDataStoreConfigProperties.create(
                         props.getMaxShardDataChangeExecutorPoolSize().getValue(),
                         props.getMaxShardDataChangeExecutorQueueSize().getValue(),
                         props.getMaxShardDataChangeListenerQueueSize().getValue(),
-                        props.getShardTransactionIdleTimeoutInMinutes().getValue(),
-                        props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+                Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+                        TimeUnit.MINUTES),
+                props.getOperationTimeoutInSeconds().getValue());
+
+        return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+                datastoreContext, bundleContext);
     }
 
     public void setBundleContext(BundleContext bundleContext) {
index c185e871ea46cd91f1176a43eda52e9b464d8bd2..814e6f606ac00bc311eb191c81497e46b56e2357 100644 (file)
@@ -1,9 +1,14 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.osgi.framework.BundleContext;
 
+import scala.concurrent.duration.Duration;
+
 public class DistributedOperationalDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
     private BundleContext bundleContext;
@@ -35,14 +40,18 @@ public class DistributedOperationalDataStoreProviderModule extends
             props = new OperationalProperties();
         }
 
-        return DistributedDataStoreFactory.createInstance("operational",
-                getOperationalSchemaServiceDependency(),
-                new DistributedDataStoreProperties(
+        DatastoreContext datastoreContext = new DatastoreContext("DistributedOperationalDatastore",
+                InMemoryDOMDataStoreConfigProperties.create(
                         props.getMaxShardDataChangeExecutorPoolSize().getValue(),
                         props.getMaxShardDataChangeExecutorQueueSize().getValue(),
                         props.getMaxShardDataChangeListenerQueueSize().getValue(),
-                        props.getShardTransactionIdleTimeoutInMinutes().getValue(),
-                        props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+                Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+                        TimeUnit.MINUTES),
+                props.getOperationTimeoutInSeconds().getValue());
+
+        return DistributedDataStoreFactory.createInstance("operational",
+                getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
     }
 
     public void setBundleContext(BundleContext bundleContext) {
index d50be2ca0ef8fdc8123e5b63a62887034bed0bb1..82bc5e29bc98465624ad181a6e74b06942e9ed1b 100644 (file)
@@ -66,7 +66,13 @@ module distributed-datastore-provider {
             type non-zero-uint16-type;
             description "The maximum queue size for each shard's data store data change listeners.";
          }
-         
+
+         leaf max-shard-data-store-executor-queue-size {
+            default 5000;
+            type non-zero-uint16-type;
+            description "The maximum queue size for each shard's data store executor.";
+         }
+            
          leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
             type non-zero-uint16-type;
index e23a76b0b223ee2b8ee0d5398da878cd520caf65..4c550a768cce258e3a151139f753751281b439d6 100644 (file)
@@ -22,11 +22,6 @@ public abstract class AbstractActorTest {
 
     @BeforeClass
     public static void setUpClass() throws IOException {
-        File journal = new File("journal");
-
-        if(journal.exists()) {
-            FileUtils.deleteDirectory(journal);
-        }
 
         System.setProperty("shard.persistent", "false");
         system = ActorSystem.create("test");
@@ -36,12 +31,21 @@ public abstract class AbstractActorTest {
     public static void tearDownClass() throws IOException {
         JavaTestKit.shutdownActorSystem(system);
         system = null;
+    }
 
+    protected static void deletePersistenceFiles() throws IOException {
         File journal = new File("journal");
 
         if(journal.exists()) {
             FileUtils.deleteDirectory(journal);
         }
+
+        File snapshots = new File("snapshots");
+
+        if(snapshots.exists()){
+            FileUtils.deleteDirectory(snapshots);
+        }
+
     }
 
     protected ActorSystem getSystem() {
index 6f131f301fe8985dade813ce2b00bf994c7a1bf6..50367e66ce3b759325d20228a63d0f7c2eeaab5d 100644 (file)
@@ -64,7 +64,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
             final SchemaContext schemaContext = TestModel.createTestContext();
             DatastoreContext datastoreContext = new DatastoreContext();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, datastoreContext);
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext());
             final ActorRef shard = getSystem().actorOf(props);
 
             new Within(duration("10 seconds")) {
index 21aa00e9e0b98c60b2d117fa67f36864e0c7700a..8a7b50d20c9003682b0ec6a2626b1c342ea73a03 100644 (file)
@@ -78,7 +78,7 @@ public class DistributedDataStoreIntegrationTest {
                             final DistributedDataStore distributedDataStore =
                                 new DistributedDataStore(getSystem(), "config",
                                         new MockClusterWrapper(), configuration,
-                                        new DistributedDataStoreProperties());
+                                        new DatastoreContext());
 
                             distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
 
index cb473cb9360e03656ea83d0212ef18b70237a92e..aeb47de888564f90830dc26a05c1ead1e66a78c1 100644 (file)
@@ -71,7 +71,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{
 
         new DistributedDataStore(actorSystem, "config",
             mock(ClusterWrapper.class), mock(Configuration.class),
-            new DistributedDataStoreProperties());
+            new DatastoreContext());
 
         verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
     }
index 1feefd1c1fbcde3381d91332bd8d977048e7e074..02201f7cd1672d2c8d02415ae4d65d8b71432f8e 100644 (file)
@@ -15,8 +15,10 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import scala.concurrent.duration.Duration;
 
 import static junit.framework.Assert.assertEquals;
@@ -71,6 +73,8 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
+            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
             new Within(duration("10 seconds")) {
                 @Override
                 protected void run() {
@@ -132,6 +136,8 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
+            subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
             new Within(duration("10 seconds")) {
                 @Override
                 protected void run() {
index 4466e50f96f53651be61a696deda5249692196e9..766dcb72681d3bc5ea945e0232fa25c32f1e76d5 100644 (file)
@@ -1,10 +1,15 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.testkit.JavaTestKit;
-
+import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -16,21 +21,32 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class ShardTest extends AbstractActorTest {
@@ -44,7 +60,7 @@ public class ShardTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT);
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
             final ActorRef subject =
                 getSystem().actorOf(props, "testCreateTransactionChain");
 
@@ -103,7 +119,7 @@ public class ShardTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT);
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
             final ActorRef subject =
                 getSystem().actorOf(props, "testRegisterChangeListener");
 
@@ -165,7 +181,7 @@ public class ShardTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT);
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
             final ActorRef subject =
                 getSystem().actorOf(props, "testCreateTransaction");
 
@@ -227,7 +243,7 @@ public class ShardTest extends AbstractActorTest {
                     .shardName("inventory").type("config").build();
 
             peerAddresses.put(identifier, null);
-            final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT);
+            final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
             final ActorRef subject =
                 getSystem().actorOf(props, "testPeerAddressResolved");
 
@@ -245,6 +261,157 @@ public class ShardTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testApplySnapshot() throws ExecutionException, InterruptedException {
+        Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+
+        final ShardIdentifier identifier =
+            ShardIdentifier.builder().memberName("member-1")
+                .shardName("inventory").type("config").build();
+
+        peerAddresses.put(identifier, null);
+        final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
+
+        TestActorRef<Shard> ref = TestActorRef.create(getSystem(), props);
+
+        ref.underlyingActor().updateSchemaContext(TestModel.createTestContext());
+
+        NormalizedNodeToNodeCodec codec =
+            new NormalizedNodeToNodeCodec(TestModel.createTestContext());
+
+        ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        NormalizedNode expected = ref.underlyingActor().readStore();
+
+        NormalizedNodeMessages.Container encode = codec
+            .encode(YangInstanceIdentifier.builder().build(), expected);
+
+
+        ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString());
+
+        NormalizedNode actual = ref.underlyingActor().readStore();
+
+        assertEquals(expected, actual);
+    }
+
+    private static class ShardTestKit extends JavaTestKit {
+
+        private ShardTestKit(ActorSystem actorSystem) {
+            super(actorSystem);
+        }
+
+        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+            // Wait for a specific log message to show up
+            final boolean result =
+                new JavaTestKit.EventFilter<Boolean>(logLevel
+                ) {
+                    @Override
+                    protected Boolean run() {
+                        return true;
+                    }
+                }.from(subject.path().toString())
+                    .message(logMessage)
+                    .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
+
+        }
+
+    }
+
+    @Test
+    public void testCreateSnapshot() throws IOException, InterruptedException {
+        new ShardTestKit(getSystem()) {{
+            final ShardIdentifier identifier =
+                ShardIdentifier.builder().memberName("member-1")
+                    .shardName("inventory").type("config").build();
+
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
+            final ActorRef subject =
+                getSystem().actorOf(props, "testCreateSnapshot");
+
+            // Wait for a specific log message to show up
+            this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader");
+
+
+            new Within(duration("3 seconds")) {
+                @Override
+                protected void run() {
+
+                    subject.tell(
+                        new UpdateSchemaContext(TestModel.createTestContext()),
+                        getRef());
+
+                    subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
+                        getRef());
+
+                    waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
+                }
+            };
+
+            Thread.sleep(2000);
+            deletePersistenceFiles();
+        }};
+    }
+
+    /**
+     * This test simply verifies that the applySnapShot logic will work
+     * @throws ReadFailedException
+     */
+    @Test
+    public void testInMemoryDataStoreRestore() throws ReadFailedException {
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
+            MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
+
+        store.onGlobalContextUpdated(TestModel.createTestContext());
+
+        DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
+        putTransaction.write(TestModel.TEST_PATH,
+            ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        commitTransaction(putTransaction);
+
+
+        NormalizedNode expected = readStore(store);
+
+        DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
+
+        writeTransaction.delete(YangInstanceIdentifier.builder().build());
+        writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
+
+        commitTransaction(writeTransaction);
+
+        NormalizedNode actual = readStore(store);
+
+        assertEquals(expected, actual);
+
+    }
+
+    private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
+        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
+            transaction.read(YangInstanceIdentifier.builder().build());
+
+        Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+
+        NormalizedNode<?, ?> normalizedNode = optional.get();
+
+        transaction.close();
+
+        return normalizedNode;
+    }
+
+    private void commitTransaction(DOMStoreWriteTransaction transaction) {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        ListenableFuture<Void> future =
+            commitCohort.preCommit();
+        try {
+            future.get();
+            future = commitCohort.commit();
+            future.get();
+        } catch (InterruptedException | ExecutionException e) {
+        }
+    }
+
     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
             @Override
index 71eb1f1603a7b00dfc576ec43f8816a41ef2a1ed..c5968c358f87824f3819edf739c5f9cdafcbb664 100644 (file)
@@ -9,6 +9,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -32,6 +33,8 @@ public class ShardTransactionChainTest extends AbstractActorTest {
 
     private static final String mockShardName = "mockShardName";
 
+    private final ShardStats shardStats = new ShardStats(mockShardName, "DataStore");
+
     @BeforeClass
     public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
@@ -41,7 +44,7 @@ public class ShardTransactionChainTest extends AbstractActorTest {
     public void testOnReceiveCreateTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final Props props = ShardTransactionChain.props(store.createTransactionChain(),
-                    testSchemaContext, DATA_STORE_CONTEXT, mockShardName);
+                    testSchemaContext, DATA_STORE_CONTEXT, shardStats);
             final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
 
             new Within(duration("1 seconds")) {
@@ -79,7 +82,7 @@ public class ShardTransactionChainTest extends AbstractActorTest {
     public void testOnReceiveCloseTransactionChain() throws Exception {
         new JavaTestKit(getSystem()) {{
             final Props props = ShardTransactionChain.props(store.createTransactionChain(),
-                    testSchemaContext, DATA_STORE_CONTEXT,mockShardName );
+                    testSchemaContext, DATA_STORE_CONTEXT, shardStats );
             final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
 
             new Within(duration("1 seconds")) {
index 4fe60f6467d2dab4db24dd34df03b4b6d9c9c7d8..869f47578711ea313181e4011444dc82ca35feaa 100644 (file)
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -59,19 +60,24 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
     private final DatastoreContext datastoreContext = new DatastoreContext();
 
+    private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
+
     @BeforeClass
     public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
     }
 
+    private ActorRef createShard(){
+        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext()));
+    }
+
     @Test(expected = ReadFailedException.class)
     public void testNegativeReadWithReadOnlyTransactionClosed()
         throws Throwable {
 
-        final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+        final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -98,10 +104,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     public void testNegativeReadWithReadWriteTransactionClosed()
         throws Throwable {
 
-        final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+        final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -128,10 +133,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     public void testNegativeExistsWithReadWriteTransactionClosed()
         throws Throwable {
 
-        final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+        final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -158,10 +162,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     public void testNegativeWriteWithTransactionReady() throws Exception {
 
 
-        final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+        final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -191,10 +194,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     public void testNegativeReadWriteWithTransactionReady() throws Exception {
 
 
-        final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+        final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -229,10 +231,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     public void testNegativeMergeTransactionReady() throws Exception {
 
 
-        final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+        final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -262,10 +263,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
 
 
-        final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
+        final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
index ff2ee08f94ee610afeef6a0f8d069d7adfe79952..0beb00b435ebe622d888dd58b40d938877c4d612 100644 (file)
@@ -13,6 +13,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -62,18 +63,24 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private DatastoreContext datastoreContext = new DatastoreContext();
 
+    private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
+
     @BeforeClass
     public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
     }
 
+    private ActorRef createShard(){
+        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+            Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext()));
+    }
+
     @Test
     public void testOnReceiveReadData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, new DatastoreContext()));
+            final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject = getSystem().actorOf(props, "testReadData");
 
             new Within(duration("1 seconds")) {
@@ -113,10 +120,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, new DatastoreContext()));
+            final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
 
             new Within(duration("1 seconds")) {
@@ -157,10 +163,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDataExistsPositive() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, new DatastoreContext()));
+            final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
 
             new Within(duration("1 seconds")) {
@@ -200,10 +205,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDataExistsNegative() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, new DatastoreContext()));
+            final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
 
             new Within(duration("1 seconds")) {
@@ -278,10 +282,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveWriteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, new DatastoreContext()));
+            final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testWriteData");
 
@@ -319,10 +322,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveMergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, new DatastoreContext()));
+            final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testMergeData");
 
@@ -361,10 +363,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDeleteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, new DatastoreContext()));
+            final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testDeleteData");
 
@@ -401,10 +402,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, new DatastoreContext()));
+            final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testReadyTransaction");
 
@@ -440,10 +440,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, new DatastoreContext()));
+            final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testCloseTransaction");
 
@@ -491,10 +490,9 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     @Test(expected=UnknownMessageException.class)
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
-        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                Collections.EMPTY_MAP, new DatastoreContext()));
+        final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
         final TestActorRef subject = TestActorRef.apply(props,getSystem());
 
         subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@@ -503,14 +501,14 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testShardTransactionInactivity() {
 
-        datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
-                Duration.create(500, TimeUnit.MILLISECONDS));
+        datastoreContext = new DatastoreContext("Test",
+                InMemoryDOMDataStoreConfigProperties.getDefault(),
+                Duration.create(500, TimeUnit.MILLISECONDS), 5);
 
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, new DatastoreContext()));
+            final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
index e39b9abd65a711e333f9c0315b25de2e3457266a..4e4c34bcbc75fe0e74369ae407d83624c3438ae0 100644 (file)
@@ -23,6 +23,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@ -66,6 +67,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
 
     private final DatastoreContext datastoreContext = new DatastoreContext();
 
+    private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
     @BeforeClass
     public static void staticSetup() {
@@ -74,18 +76,20 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
 
     private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
 
+    private ActorRef createShard(){
+        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
+    }
 
     @Test(expected = TestException.class)
     public void testNegativeAbortResultsInException() throws Exception {
 
-        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                Collections.EMPTY_MAP, datastoreContext));
+        final ActorRef shard = createShard();
         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
             .mock(DOMStoreThreePhaseCommitCohort.class);
         final CompositeModification mockComposite =
             Mockito.mock(CompositeModification.class);
         final Props props =
-            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
 
         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
             .create(getSystem(), props,
@@ -107,14 +111,13 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
     @Test(expected = OptimisticLockFailedException.class)
     public void testNegativeCanCommitResultsInException() throws Exception {
 
-        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                Collections.EMPTY_MAP, datastoreContext));
+        final ActorRef shard = createShard();
         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
             .mock(DOMStoreThreePhaseCommitCohort.class);
         final CompositeModification mockComposite =
             Mockito.mock(CompositeModification.class);
         final Props props =
-            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
 
         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
             .create(getSystem(), props,
@@ -139,14 +142,13 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
     @Test(expected = TestException.class)
     public void testNegativePreCommitResultsInException() throws Exception {
 
-        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                Collections.EMPTY_MAP, datastoreContext));
+        final ActorRef shard = createShard();
         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
             .mock(DOMStoreThreePhaseCommitCohort.class);
         final CompositeModification mockComposite =
             Mockito.mock(CompositeModification.class);
         final Props props =
-            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
 
         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
             .create(getSystem(), props,
@@ -170,12 +172,12 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
     public void testNegativeCommitResultsInException() throws Exception {
 
         final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
-                Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext),
+                Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()),
                 "testNegativeCommitResultsInException");
 
         final ActorRef shardTransaction =
             getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()));
+                    testSchemaContext, datastoreContext, shardStats));
 
         ShardTransactionMessages.WriteData writeData =
             ShardTransactionMessages.WriteData.newBuilder()
index c4d0b85fb54546faa5e0667e58e472d549d5836f..e9df3ecd49ed83a5d7021d86487277545bbd5787 100644 (file)
@@ -11,10 +11,12 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
+
+import java.lang.management.ManagementFactory;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
@@ -26,11 +28,11 @@ public class ShardStatsTest {
     @Before
     public void setUp() throws Exception {
 
-        shardStats = new ShardStats("shard-1");
+        shardStats = new ShardStats("shard-1", "DataStore");
         shardStats.registerMBean();
-        mbeanServer = shardStats.getMBeanServer();
+        mbeanServer = ManagementFactory.getPlatformMBeanServer();
         String objectName =
-            AbstractBaseMBean.BASE_JMX_PREFIX + "type=" + shardStats
+            AbstractMXBean.BASE_JMX_PREFIX + "type=" + shardStats
                 .getMBeanType() + ",Category=" +
                 shardStats.getMBeanCategory() + ",name=" +
                 shardStats.getMBeanName();
@@ -46,7 +48,7 @@ public class ShardStatsTest {
     public void testGetShardName() throws Exception {
 
         Object attribute = mbeanServer.getAttribute(testMBeanName, "ShardName");
-        Assert.assertEquals((String) attribute, "shard-1");
+        Assert.assertEquals(attribute, "shard-1");
 
     }
 
@@ -60,7 +62,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         Object attribute = mbeanServer.getAttribute(testMBeanName,
             "CommittedTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 3L);
+        Assert.assertEquals(attribute, 3L);
 
 
     }
@@ -71,13 +73,13 @@ public class ShardStatsTest {
         Assert.assertEquals(shardStats.getLastCommittedTransactionTime(),
             sdf.format(new Date(0L)));
         long millis = System.currentTimeMillis();
-        shardStats.setLastCommittedTransactionTime(new Date(millis));
+        shardStats.setLastCommittedTransactionTime(millis);
 
         //now let us get from MBeanServer what is the transaction count.
         Object attribute = mbeanServer.getAttribute(testMBeanName,
             "LastCommittedTransactionTime");
-        Assert.assertEquals((String) attribute, sdf.format(new Date(millis)));
-        Assert.assertNotEquals((String) attribute,
+        Assert.assertEquals(attribute, sdf.format(new Date(millis)));
+        Assert.assertNotEquals(attribute,
             sdf.format(new Date(millis - 1)));
 
     }
@@ -92,7 +94,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         Object attribute =
             mbeanServer.getAttribute(testMBeanName, "FailedTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 2L);
+        Assert.assertEquals(attribute, 2L);
     }
 
     @Test
@@ -105,7 +107,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         Object attribute =
             mbeanServer.getAttribute(testMBeanName, "AbortTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 2L);
+        Assert.assertEquals(attribute, 2L);
     }
 
     @Test
@@ -118,7 +120,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         Object attribute =
             mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 2L);
+        Assert.assertEquals(attribute, 2L);
     }
 
     @Test
@@ -132,7 +134,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         Object attribute = mbeanServer.getAttribute(testMBeanName,
             "CommittedTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 3L);
+        Assert.assertEquals(attribute, 3L);
 
         //let us increment FailedReadTransactions count and then check
         shardStats.incrementFailedReadTransactionsCount();
@@ -142,7 +144,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         attribute =
             mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 2L);
+        Assert.assertEquals(attribute, 2L);
 
 
         //here we will reset the counters and check the above ones are 0 after reset
@@ -151,11 +153,11 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         attribute = mbeanServer.getAttribute(testMBeanName,
             "CommittedTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 0L);
+        Assert.assertEquals(attribute, 0L);
 
         attribute =
             mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 0L);
+        Assert.assertEquals(attribute, 0L);
 
 
     }
index b98844ba641f6e7f585c3c6b687c56d55161933c..1ab12ff26f1bddc451f4aa3ceeddd111b8546f19 100644 (file)
@@ -22,9 +22,8 @@ public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.cont
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-
-        InMemoryDOMDataStore dataStore = InMemoryDOMDataStoreFactory.create(
-                "DOM-CFG", getSchemaServiceDependency(),
+        InMemoryDOMDataStore dataStore = InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency(),
+                getDebugTransactions(),
                 InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
                         getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
                         getMaxDataStoreExecutorQueueSize()));
index 4532452c65e32521dece928f5b20f00310f3c358..9358552579bb155d76357bbbdbbe7e94eab98f56 100644 (file)
@@ -23,7 +23,7 @@ public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight
     @Override
     public java.lang.AutoCloseable createInstance() {
         InMemoryDOMDataStore dataStore = InMemoryDOMDataStoreFactory.create("DOM-OPER", getSchemaServiceDependency(),
-                InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
+                getDebugTransactions(), InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
                         getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
                         getMaxDataStoreExecutorQueueSize()));
 
index 8a190c115fb2c68bd99228378ed389620af94023..6cc593904723ee6cf7a28f26fd0367703ec89371 100644 (file)
@@ -7,25 +7,26 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Preconditions;
 
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.slf4j.Logger;
+
 /**
  * Abstract DOM Store Transaction
  *
  * Convenience super implementation of DOM Store transaction which provides
  * common implementation of {@link #toString()} and {@link #getIdentifier()}.
- *
- *
  */
 abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
+    private final Throwable debugContext;
     private final Object identifier;
 
-    protected AbstractDOMStoreTransaction(final Object identifier) {
-        this.identifier = Preconditions.checkNotNull(identifier,"Identifier must not be null.");
+    protected AbstractDOMStoreTransaction(final Object identifier, final boolean debug) {
+        this.identifier = Preconditions.checkNotNull(identifier, "Identifier must not be null.");
+        this.debugContext = debug ? new Throwable().fillInStackTrace() : null;
     }
 
     @Override
@@ -33,6 +34,12 @@ abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
         return identifier;
     }
 
+    protected final void warnDebugContext(final Logger logger) {
+        if (debugContext != null) {
+            logger.warn("Transaction {} has been allocated in the following context", identifier, debugContext);
+        }
+    }
+
     @Override
     public final String toString() {
         return addToStringAttributes(Objects.toStringHelper(this)).toString();
index 476356a19e79113b0b0121662b8842610757d12f..3e748618169889cdfcf0b9f8eb66c73e65cb4338 100644 (file)
@@ -16,14 +16,11 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.annotation.concurrent.GuardedBy;
-
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
@@ -91,7 +88,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     private final ExecutorService dataChangeListenerExecutor;
 
     private final ExecutorService domStoreExecutor;
-
+    private final boolean debugTransactions;
     private final String name;
 
     private volatile AutoCloseable closeable;
@@ -99,15 +96,17 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
             final ExecutorService dataChangeListenerExecutor) {
         this(name, domStoreExecutor, dataChangeListenerExecutor,
-             InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
+             InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
     }
 
     public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
-            final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize) {
+            final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize,
+            final boolean debugTransactions) {
         this.name = Preconditions.checkNotNull(name);
         this.domStoreExecutor = Preconditions.checkNotNull(domStoreExecutor);
         this.listeningExecutor = MoreExecutors.listeningDecorator(this.domStoreExecutor);
         this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
+        this.debugTransactions = debugTransactions;
 
         dataChangeListenerNotificationManager =
                 new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
@@ -134,17 +133,17 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new SnapshotBackedReadTransaction(nextIdentifier(), dataTree.takeSnapshot());
+        return new SnapshotBackedReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot());
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
+        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new SnapshotBackedWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
+        return new SnapshotBackedWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
     }
 
     @Override
@@ -171,6 +170,10 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         }
     }
 
+    boolean getDebugTransactions() {
+        return debugTransactions;
+    }
+
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
             final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
@@ -242,7 +245,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             } else {
                 snapshot = dataTree.takeSnapshot();
             }
-            return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+            return new SnapshotBackedReadTransaction(nextIdentifier(), getDebugTransactions(), snapshot);
         }
 
         @Override
@@ -256,7 +259,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
                 snapshot = dataTree.takeSnapshot();
             }
             final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
-                    snapshot, this);
+                    getDebugTransactions(), snapshot, this);
             latestOutstandingTx = ret;
             return ret;
         }
@@ -271,8 +274,8 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             } else {
                 snapshot = dataTree.takeSnapshot();
             }
-            final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,
-                    this);
+            final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(),
+                    getDebugTransactions(), snapshot, this);
             latestOutstandingTx = ret;
             return ret;
         }
@@ -384,10 +387,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
                     } catch (ConflictingModificationAppliedException e) {
                         LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
                                 e.getPath());
+                        transaction.warnDebugContext(LOG);
                         throw new OptimisticLockFailedException("Optimistic lock failed.",e);
                     } catch (DataValidationFailedException e) {
                         LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
                                 e.getPath(), e);
+                        transaction.warnDebugContext(LOG);
                         throw new TransactionCommitFailedException("Data did not pass validation.",e);
                     }
                 }
index 052fb2b89ba563c7be55612d75b48625f31de840..dc1482c6abaefb7880c7f6b55cc37c4d6ad65e3f 100644 (file)
@@ -5,13 +5,10 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import java.util.concurrent.ExecutorService;
-
 import javax.annotation.Nullable;
-
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
 
@@ -42,6 +39,22 @@ public final class InMemoryDOMDataStoreFactory {
     public static InMemoryDOMDataStore create(final String name,
             @Nullable final SchemaService schemaService,
             @Nullable final InMemoryDOMDataStoreConfigProperties properties) {
+        return create(name, schemaService, false, properties);
+    }
+
+    /**
+     * Creates an InMemoryDOMDataStore instance.
+     *
+     * @param name the name of the data store
+     * @param schemaService the SchemaService to which to register the data store.
+     * @param debugTransactions enable transaction debugging
+     * @param properties configuration properties for the InMemoryDOMDataStore instance. If null,
+     *                   default property values are used.
+     * @return an InMemoryDOMDataStore instance
+     */
+    public static InMemoryDOMDataStore create(final String name,
+            @Nullable final SchemaService schemaService, final boolean debugTransactions,
+            @Nullable final InMemoryDOMDataStoreConfigProperties properties) {
 
         InMemoryDOMDataStoreConfigProperties actualProperties = properties;
         if(actualProperties == null) {
@@ -64,7 +77,7 @@ public final class InMemoryDOMDataStoreFactory {
 
         InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
                 domStoreExecutor, dataChangeListenerExecutor,
-                actualProperties.getMaxDataChangeListenerQueueSize());
+                actualProperties.getMaxDataChangeListenerQueueSize(), debugTransactions);
 
         if(schemaService != null) {
             schemaService.registerSchemaContextListener(dataStore);
index 2caa76d49dff02a34ad98232ccdc7db139f07f2c..ed95796499b3e9a6887d5708a76d636934aa9027 100644 (file)
@@ -7,10 +7,13 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
+
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -19,8 +22,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  *
  * Implementation of read-only transaction backed by {@link DataTreeSnapshot}
@@ -35,8 +36,8 @@ final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction
     private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class);
     private volatile DataTreeSnapshot stableSnapshot;
 
-    public SnapshotBackedReadTransaction(final Object identifier, final DataTreeSnapshot snapshot) {
-        super(identifier);
+    public SnapshotBackedReadTransaction(final Object identifier, final boolean debug, final DataTreeSnapshot snapshot) {
+        super(identifier, debug);
         this.stableSnapshot = Preconditions.checkNotNull(snapshot);
         LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot);
     }
@@ -66,7 +67,7 @@ final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction
     }
 
     @Override
-    public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
+    public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
         LOG.debug("Tx: {} Exists: {}", getIdentifier(), path);
         checkNotNull(path, "Path must not be null.");
 
index ce7043fd4747542a98802a893d0d49c3c12b3de5..2ae7425bbb9de443d4cdae61dc90dbc94ba61234 100644 (file)
@@ -9,19 +9,19 @@ package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-
 /**
  * Implementation of Read-Write transaction which is backed by {@link DataTreeSnapshot}
  * and executed according to {@link TransactionReadyPrototype}.
@@ -39,9 +39,9 @@ class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction
      * @param snapshot Snapshot which will be modified.
      * @param readyImpl Implementation of ready method.
      */
-    protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataTreeSnapshot snapshot,
-            final TransactionReadyPrototype store) {
-        super(identifier, snapshot, store);
+    protected SnapshotBackedReadWriteTransaction(final Object identifier, final boolean debug,
+            final DataTreeSnapshot snapshot, final TransactionReadyPrototype store) {
+        super(identifier, debug, snapshot, store);
     }
 
     @Override
@@ -62,8 +62,8 @@ class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction
         }
     }
 
-    @Override public CheckedFuture<Boolean, ReadFailedException> exists(
-        YangInstanceIdentifier path) {
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
         try {
             return Futures.immediateCheckedFuture(
                 read(path).checkedGet().isPresent());
index 34532ab98fdc45502e3794a8ea967e4e3da4f805..6129df74787b2fdb8b2da30d9cc2c78f04b288bc 100644 (file)
@@ -9,20 +9,19 @@ package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.base.Objects.ToStringHelper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-
 /**
  * Implementation of Write transaction which is backed by
  * {@link DataTreeSnapshot} and executed according to
@@ -46,9 +45,9 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction impleme
      * @param readyImpl
      *            Implementation of ready method.
      */
-    public SnapshotBackedWriteTransaction(final Object identifier, final DataTreeSnapshot snapshot,
-            final TransactionReadyPrototype readyImpl) {
-        super(identifier);
+    public SnapshotBackedWriteTransaction(final Object identifier, final boolean debug,
+            final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) {
+        super(identifier, debug);
         mutableTree = snapshot.newModification();
         this.readyImpl = Preconditions.checkNotNull(readyImpl, "readyImpl must not be null.");
         LOG.debug("Write Tx: {} allocated with snapshot {}", identifier, snapshot);
index 7d19a64446b519ea7795e86d81c6c431599ef7d9..5ffe4d60ca879c58e562557c1f4a78ce9638f7d6 100644 (file)
@@ -1,4 +1,3 @@
-
 module opendaylight-inmemory-datastore-provider {
 
     yang-version 1;
@@ -7,8 +6,8 @@ module opendaylight-inmemory-datastore-provider {
 
     import config { prefix config; revision-date 2013-04-05; }
     import rpc-context { prefix rpcx; revision-date 2013-06-17; }
-       import opendaylight-config-dom-datastore {prefix config-dom-store-spi;}
-       import opendaylight-operational-dom-datastore {prefix operational-dom-store-spi;}
+    import opendaylight-config-dom-datastore {prefix config-dom-store-spi;}
+    import opendaylight-operational-dom-datastore {prefix operational-dom-store-spi;}
     import opendaylight-md-sal-dom {prefix sal;}
 
     description
@@ -28,11 +27,11 @@ module opendaylight-inmemory-datastore-provider {
 
         // This is the definition of the service implementation as a module identity.
 
-     identity inmemory-operational-datastore-provider {
-             base config:module-type;
-             config:provided-service operational-dom-store-spi:operational-dom-datastore;
-             config:java-name-prefix InMemoryOperationalDataStoreProvider;
-      }
+    identity inmemory-operational-datastore-provider {
+            base config:module-type;
+            config:provided-service operational-dom-store-spi:operational-dom-datastore;
+            config:java-name-prefix InMemoryOperationalDataStoreProvider;
+    }
 
     grouping datastore-configuration {
             leaf max-data-change-executor-queue-size {
@@ -52,12 +51,16 @@ module opendaylight-inmemory-datastore-provider {
                 type uint16;
                 description "The maximum queue size for the data change listeners.";
             }
-
             leaf max-data-store-executor-queue-size {
                 default 5000;
                 type uint16;
                 description "The maximum queue size for the data store executor.";
             }
+            leaf debug-transactions {
+                type boolean;
+                default false;
+                description "Enable transaction lifecycle debugging.";
+            }
     }
 
     // Augments the 'configuration' choice node under modules/module.
index c609e13e791c375979ad99bce62ed27df9d321f8..04e19493dbb98051118553cd4573cc00d390bdd2 100644 (file)
@@ -7,10 +7,18 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.concurrent.ExecutionException;
+
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -32,13 +40,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-import java.util.concurrent.ExecutionException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 
 public class InMemoryDataStoreTest {
 
@@ -271,7 +272,7 @@ public class InMemoryDataStoreTest {
         Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockSnapshot )
         .readNode( Mockito.any( YangInstanceIdentifier.class ) );
 
-        DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction( "1", mockSnapshot );
+        DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction("1", true, mockSnapshot);
 
         doReadAndThrowEx( readTx );
     }
@@ -296,12 +297,12 @@ public class InMemoryDataStoreTest {
         .readNode( Mockito.any( YangInstanceIdentifier.class ) );
         Mockito.doReturn( mockModification ).when( mockSnapshot ).newModification();
         TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class );
-        DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction( "1", mockSnapshot, mockReady );
+        DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction("1", false, mockSnapshot, mockReady);
 
         doReadAndThrowEx( readTx );
     }
 
-    private void doReadAndThrowEx( DOMStoreReadTransaction readTx ) throws Throwable {
+    private void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable {
 
         try {
             readTx.read(TestModel.TEST_PATH).get();
index 3800413eb1b9964d00207f526177862eb69c5885..1cf648eb975c521d6c81e22b927e8f276f888e99 100644 (file)
@@ -13,21 +13,27 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class OperationProcessor implements Runnable {
+final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
     private static final int MAX_TRANSACTION_OPERATIONS = 100;
     private static final int OPERATION_QUEUE_DEPTH = 500;
 
     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
     private final DataBroker dataBroker;
+    private final BindingTransactionChain transactionChain;
 
     OperationProcessor(final DataBroker dataBroker) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
+        transactionChain = this.dataBroker.createTransactionChain(this);
     }
 
     void enqueueOperation(final TopologyOperation task) {
@@ -45,7 +51,8 @@ final class OperationProcessor implements Runnable {
                 TopologyOperation op = queue.take();
 
                 LOG.debug("New operations available, starting transaction");
-                final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
+                final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
+
 
                 int ops = 0;
                 do {
@@ -83,4 +90,22 @@ final class OperationProcessor implements Runnable {
             queue.poll();
         }
     }
+
+    @Override
+    public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
+        LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause);
+    }
+
+    @Override
+    public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+        //NOOP
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (transactionChain != null) {
+            transactionChain.close();
+        }
+
+    }
 }