Merge "BUG 2221 : Add metering to ShardTransaction actor"
authorMoiz Raja <moraja@cisco.com>
Wed, 29 Oct 2014 03:05:09 +0000 (03:05 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 29 Oct 2014 03:05:10 +0000 (03:05 +0000)
112 files changed:
features/base/pom.xml
features/base/src/main/resources/features.xml
features/netconf-connector/pom.xml
features/netconf/pom.xml
features/netconf/src/main/resources/features.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java
opendaylight/config/config-api/src/main/yang/config.yang
opendaylight/config/shutdown-impl/src/main/java/org/opendaylight/controller/config/yang/shutdown/impl/ShutdownModuleFactory.java
opendaylight/config/shutdown-impl/src/main/java/org/opendaylight/controller/config/yang/shutdown/impl/ShutdownServiceImpl.java
opendaylight/distribution/opendaylight/pom.xml
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeMapping.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/NodeToNormalizedNodeBuilder.java [deleted file]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/NodeValueCodec.java [deleted file]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/NormalizedNodeToProtocolBufferNode.java [deleted file]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/NodeIdentifierFactory.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NodeTypes.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamWriter.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/PathArgumentTypes.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ValueTypes.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/util/EncoderDecoderUtil.java [deleted file]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeWriter.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/SampleNormalizedNodeSerializable.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/util/NormalizedNodeXmlConverterTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpCohort.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/LocalShardNotFoundException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/md/sal/dom/broker/spi/rpc/RpcRoutingStrategy.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLAdapter.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java
opendaylight/md-sal/samples/toaster-provider/src/main/java/org/opendaylight/controller/sample/toaster/provider/OpendaylightToaster.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/mapping/runtime/InstanceRuntime.java
opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/NetconfMappingTest.java
opendaylight/netconf/netconf-artifacts/pom.xml [new file with mode: 0644]
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/SubtreeFilter.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/SubtreeFilterTest.java
opendaylight/netconf/netconf-impl/src/test/resources/subtree/9/post-filter.xml [new file with mode: 0644]
opendaylight/netconf/netconf-impl/src/test/resources/subtree/9/pre-filter.xml [new file with mode: 0644]
opendaylight/netconf/netconf-impl/src/test/resources/subtree/9/request.xml [new file with mode: 0644]
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java
opendaylight/netconf/netconf-netty-util/pom.xml
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java [moved from opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java with 66% similarity]
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java
opendaylight/netconf/netconf-ssh/pom.xml
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/NetconfSSHServer.java [deleted file]
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/RemoteNetconfCommand.java [new file with mode: 0644]
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/SshProxyClientHandler.java [new file with mode: 0644]
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/SshProxyServer.java [new file with mode: 0644]
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/authentication/PEMGenerator.java [deleted file]
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/AuthProviderTracker.java [new file with mode: 0644]
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/NetconfSSHActivator.java
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/threads/Handshaker.java [deleted file]
opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java
opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/ssh/authentication/SSHServerTest.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/xml/XmlElement.java
opendaylight/netconf/pom.xml
pom.xml
third-party/ganymed/pom.xml [deleted file]
third-party/ganymed/src/main/java/ch/ethz/ssh2/Connection.java [deleted file]
third-party/ganymed/src/main/java/ch/ethz/ssh2/channel/ChannelManager.java [deleted file]
third-party/ganymed/src/main/java/ch/ethz/ssh2/transport/TransportManager.java [deleted file]

index b7ab3ca..cd84eea 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>karaf-tomcat-security</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller.thirdparty</groupId>
-      <artifactId>ganymed</artifactId>
-    </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
index c324f6c..d6802ac 100644 (file)
@@ -36,7 +36,6 @@
       <bundle>wrap:mvn:io.netty/netty-common/${netty.version}</bundle>
       <bundle>wrap:mvn:io.netty/netty-handler/${netty.version}</bundle>
       <bundle>wrap:mvn:io.netty/netty-codec-http/${netty.version}</bundle>
-      <bundle>mvn:org.opendaylight.controller.thirdparty/ganymed/1.2.0-SNAPSHOT</bundle>
    </feature>
    <feature name="odl-base-jersey" description="Jersey" version="${jersey.version}">
       <feature>odl-base-gemini-web</feature>
@@ -80,6 +79,7 @@
    <feature name="odl-base-eclipselink-persistence" description="EclipseLink Persistence API" version="2.0.4.v201112161009">
       <bundle start="true">mvn:eclipselink/javax.persistence/2.0.4.v201112161009</bundle>
       <bundle start="true">mvn:eclipselink/javax.resource/1.5.0.v200906010428</bundle>
+      <bundle start="true">mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/2.5.0</bundle>
       <bundle start="true">mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/2.5.0</bundle>
       <bundle start="true">mvn:org.eclipse.persistence/org.eclipse.persistence.core/2.5.0</bundle>
    </feature>
index 03d6fed..c69ee19 100644 (file)
       Optional TODO: Remove TODO comments.
     -->
     <!-- test to validate features.xml -->
+   <!--FIXME BUG-2195 When running single feature tests for netconf connector, features including ssh proxy server always fail (this behavior does not appear when running karaf distro directly)-->
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>features-test</artifactId>
index a944bb4..028c16b 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-netty-util</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller.thirdparty</groupId>
-      <artifactId>ganymed</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.apache.sshd</groupId>
       <artifactId>sshd-core</artifactId>
index 444f208..fb668ae 100644 (file)
@@ -57,7 +57,6 @@
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <feature version='${project.version}'>odl-netconf-util</feature>
     <bundle>mvn:org.opendaylight.controller/netconf-netty-util/${project.version}</bundle>
-    <bundle>mvn:org.opendaylight.controller.thirdparty/ganymed/${ganymed.version}</bundle>
     <bundle>mvn:org.apache.sshd/sshd-core/${sshd-core.version}</bundle>
     <bundle>mvn:org.openexi/nagasena/${exi.nagasena.version}</bundle>
     <bundle>mvn:io.netty/netty-codec/${netty.version}</bundle>
index ffb9ef7..198d17a 100644 (file)
     <forwarding.staticrouting.northbound.version>0.5.0-SNAPSHOT</forwarding.staticrouting.northbound.version>
     <forwardingrulesmanager.implementation.version>0.5.0-SNAPSHOT</forwardingrulesmanager.implementation.version>
     <forwardingrulesmanager.version>0.7.0-SNAPSHOT</forwardingrulesmanager.version>
-    <ganymed.version>1.2.0-SNAPSHOT</ganymed.version>
     <hosttracker.api.version>0.6.0-SNAPSHOT</hosttracker.api.version>
     <hosttracker.implementation.version>0.6.0-SNAPSHOT</hosttracker.implementation.version>
     <hosttracker.northbound.version>0.5.0-SNAPSHOT</hosttracker.northbound.version>
     <sonar.language>java</sonar.language>
     <sonar.jacoco.reportPath>target/code-coverage/jacoco.exec</sonar.jacoco.reportPath>
     <sonar.jacoco.itReportPath>target/code-coverage/jacoco-it.exec</sonar.jacoco.itReportPath>
-    <sonar.skippedModules>org.openflow.openflowj,net.sf.jung2,org.opendaylight.controller.protobuff.messages,ch.ethz.ssh2</sonar.skippedModules>
+    <sonar.skippedModules>org.openflow.openflowj,net.sf.jung2,org.opendaylight.controller.protobuff.messages</sonar.skippedModules>
     <sonar.profile>Sonar way with Findbugs</sonar.profile>
     <spifly.version>1.0.0</spifly.version>
     <spring-osgi.version>1.2.1</spring-osgi.version>
 
   <dependencyManagement>
     <dependencies>
+
        <!-- project specific dependencies -->
       <dependency>
-        <groupId>${project.groupId}</groupId>
-        <artifactId>ietf-netconf-monitoring</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>${project.groupId}</groupId>
-        <artifactId>ietf-netconf-monitoring-extension</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>${project.groupId}</groupId>
-        <artifactId>netconf-netty-util</artifactId>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>netconf-artifacts</artifactId>
         <version>${netconf.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
       </dependency>
       <dependency>
         <groupId>org.apache.sshd</groupId>
         <artifactId>config-manager</artifactId>
         <version>${config.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>config-netconf-connector</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>config-persister-api</artifactId>
         <artifactId>config-persister-feature-adapter</artifactId>
         <version>${config.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>config-persister-impl</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
 
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <version>${dummy-console.version}</version>
       </dependency>
 
-      <!-- Netconf -->
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-api</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-client</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-client</artifactId>
-        <version>${netconf.version}</version>
-        <type>test-jar</type>
-      </dependency>
-
-      <!--Netconf config-->
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-config-dispatcher</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-impl</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-impl</artifactId>
-        <version>${netconf.version}</version>
-        <type>test-jar</type>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-mapping-api</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-monitoring</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-netty-util</artifactId>
-        <version>${netconf.version}</version>
-        <type>test-jar</type>
-      </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>netconf-auth</artifactId>
-            <version>${netconf.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>netconf-usermanager</artifactId>
-            <version>${netconf.version}</version>
-        </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-ssh</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-ssh</artifactId>
-        <version>${netconf.version}</version>
-        <type>test-jar</type>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-tcp</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-util</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-util</artifactId>
-        <version>${netconf.version}</version>
-        <type>test-jar</type>
-      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>netty-config-api</artifactId>
         <artifactId>md-sal-config</artifactId>
         <version>${mdsal.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-config</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>netconf-connector-config</artifactId>
-        <version>${netconf.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>sal-rest-docgen</artifactId>
         <artifactId>com.sun.jersey.jersey-servlet</artifactId>
         <version>${jersey-servlet.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller.thirdparty</groupId>
-        <artifactId>ganymed</artifactId>
-        <version>${ganymed.version}</version>
-      </dependency>
+
       <!-- Third parties from opendaylight released -->
       <dependency>
         <groupId>org.opendaylight.controller.thirdparty</groupId>
         <artifactId>org.openflow.openflowj</artifactId>
         <version>1.0.2</version>
       </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>binding-generator-impl</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>binding-data-codec</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>binding-generator-spi</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>binding-generator-util</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>binding-type-provider</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>concepts</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>object-cache-api</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>object-cache-guava</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>restconf-client-api</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>restconf-client-impl</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>util</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-data-composite-node</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-data-codec-gson</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
 
-      <!-- yangtools dependencies -->
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-binding</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-common</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-data-api</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-data-impl</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
+      <!-- yangtools artifacts -->
       <dependency>
         <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-data-util</artifactId>
+        <artifactId>yangtools-artifacts</artifactId>
         <version>${yangtools.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
       </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-maven-plugin-spi</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-model-api</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-model-util</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-parser-api</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>yang-parser-impl</artifactId>
-        <version>${yangtools.version}</version>
-      </dependency>
-      <!-- yang model dependencies -->
-      <dependency>
-        <groupId>org.opendaylight.yangtools.model</groupId>
-        <artifactId>ietf-inet-types</artifactId>
-        <version>${ietf-inet-types.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools.model</groupId>
-        <artifactId>ietf-restconf</artifactId>
-        <version>${ietf-restconf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools.model</groupId>
-        <artifactId>ietf-topology</artifactId>
-        <version>${ietf-topology.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools.model</groupId>
-        <artifactId>ietf-topology-l3-unicast-igp</artifactId>
-        <version>${ietf-topology.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools.model</groupId>
-        <artifactId>ietf-yang-types</artifactId>
-        <version>${ietf-yang-types.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools.model</groupId>
-        <artifactId>ietf-yang-types-20130715</artifactId>
-        <version>2013.07.15.7-SNAPSHOT</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools.model</groupId>
-        <artifactId>opendaylight-l2-types</artifactId>
-        <version>${opendaylight-l2-types.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools.model</groupId>
-        <artifactId>yang-ext</artifactId>
-        <version>${yang-ext.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools.thirdparty</groupId>
-        <artifactId>antlr4-runtime-osgi-nohead</artifactId>
-        <version>4.0</version>
-      </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools.thirdparty</groupId>
-        <artifactId>xtend-lib-osgi</artifactId>
-        <version>${xtend.version}</version>
-      </dependency>
+
       <dependency>
         <groupId>org.openexi</groupId>
         <artifactId>nagasena</artifactId>
         <version>${mdsal.version}</version>
         <scope>test</scope>
       </dependency>
-      <dependency>
-        <groupId>org.opendaylight.yangtools</groupId>
-        <artifactId>mockito-configuration</artifactId>
-        <version>${yangtools.version}</version>
-        <scope>test</scope>
-      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>features-config</artifactId>
         <type>xml</type>
         <scope>runtime</scope>
       </dependency>
-      <dependency>
-        <groupId>org.opendaylight.controller</groupId>
-        <artifactId>features-netconf</artifactId>
-        <version>${netconf.version}</version>
-        <classifier>features</classifier>
-        <type>xml</type>
-        <scope>runtime</scope>
-      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>features-config-persister</artifactId>
index ea87afa..98a2c2c 100644 (file)
@@ -98,10 +98,8 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
                 return;
             }
 
-            // Check if initial connection was fully finished. If the session was dropped during negotiation, reconnect will not happen.
-            // Session can be dropped during negotiation on purpose by the client side and would make no sense to initiate reconnect
             if (promise.isInitialConnectFinished() == false) {
-                return;
+                LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
             }
 
             LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
index 63026e3..fc38888 100644 (file)
@@ -250,52 +250,6 @@ public class ServerTest {
         assertFalse(session.isSuccess());
     }
 
-    @Test
-    public void testNegotiationFailedNoReconnect() throws Exception {
-        final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
-
-        this.dispatcher = getServerDispatcher(p);
-
-        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
-            @Override
-            public SimpleSessionListener getSessionListener() {
-                return new SimpleSessionListener();
-            }
-        });
-
-        this.server.get();
-
-        this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
-            @Override
-            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
-                                                                         final Channel channel, final Promise<SimpleSession> promise) {
-
-                return new SimpleSessionNegotiator(promise, channel) {
-                    @Override
-                    protected void startNegotiation() throws Exception {
-                        negotiationFailed(new IllegalStateException("Negotiation failed"));
-                    }
-                };
-            }
-        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
-
-        final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class);
-        final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
-        doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
-
-        this.clientDispatcher.createReconnectingClient(this.serverAddress,
-                reconnectStrategyFactory, new SessionListenerFactory<SimpleSessionListener>() {
-                    @Override
-                    public SimpleSessionListener getSessionListener() {
-                        return new SimpleSessionListener();
-                    }
-                });
-
-
-        // Only one strategy should be created for initial connect, no more = no reconnects
-        verify(reconnectStrategyFactory, times(1)).createReconnectStrategy();
-    }
-
     private SimpleDispatcher getClientDispatcher() {
         return new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
             @Override
index 5d6c11f..e46d327 100644 (file)
@@ -140,7 +140,7 @@ module config {
             "Top level container encapsulating configuration of all modules.";
 
         list module {
-            key "name";
+            key "type name";
             leaf name {
                 description "Unique module instance name";
                 type string;
index 4df9b03..1994e21 100644 (file)
@@ -5,26 +5,16 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-/**
- * Generated file
-
- * Generated from: yang module name: shutdown-impl  yang module local name: shutdown
- * Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
- * Generated at: Wed Dec 18 14:02:06 CET 2013
- *
- * Do not modify this file unless it is present under src/main directory
- */
 package org.opendaylight.controller.config.yang.shutdown.impl;
 
+import java.util.Arrays;
+import java.util.Set;
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.DependencyResolverFactory;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 
-import java.util.Arrays;
-import java.util.Set;
-
 public class ShutdownModuleFactory extends AbstractShutdownModuleFactory {
 
     public ShutdownModule instantiateModule(String instanceName, DependencyResolver dependencyResolver,
index 4abbd3b..7d97fcd 100644 (file)
@@ -8,15 +8,14 @@
 package org.opendaylight.controller.config.yang.shutdown.impl;
 
 import com.google.common.base.Optional;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
 import org.opendaylight.controller.config.shutdown.ShutdownService;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-
 public class ShutdownServiceImpl implements ShutdownService, AutoCloseable {
     private final ShutdownService impl;
     private final ShutdownRuntimeRegistration registration;
@@ -42,7 +41,7 @@ public class ShutdownServiceImpl implements ShutdownService, AutoCloseable {
 }
 
 class Impl implements ShutdownService {
-    private static final Logger logger = LoggerFactory.getLogger(Impl.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Impl.class);
     private final String secret;
     private final Bundle systemBundle;
 
@@ -53,27 +52,27 @@ class Impl implements ShutdownService {
 
     @Override
     public void shutdown(String inputSecret, Long maxWaitTime, Optional<String> reason) {
-        logger.warn("Shutdown issued with secret {} and reason {}", inputSecret, reason);
+        LOG.warn("Shutdown issued with secret {} and reason {}", inputSecret, reason);
         try {
             Thread.sleep(1000); // prevent brute force attack
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            logger.warn("Shutdown process interrupted", e);
+            LOG.warn("Shutdown process interrupted", e);
         }
         if (this.secret.equals(inputSecret)) {
-            logger.info("Server is shutting down");
+            LOG.info("Server is shutting down");
 
             // actual work:
             Thread stopSystemBundleThread = new StopSystemBundleThread(systemBundle);
             stopSystemBundleThread.start();
             if (maxWaitTime != null && maxWaitTime > 0) {
                 Thread systemExitThread = new CallSystemExitThread(maxWaitTime);
-                logger.debug("Scheduling {}", systemExitThread);
+                LOG.debug("Scheduling {}", systemExitThread);
                 systemExitThread.start();
             }
             // end
         } else {
-            logger.warn("Unauthorized attempt to shut down server");
+            LOG.warn("Unauthorized attempt to shut down server");
             throw new IllegalArgumentException("Invalid secret");
         }
     }
@@ -81,7 +80,7 @@ class Impl implements ShutdownService {
 }
 
 class StopSystemBundleThread extends Thread {
-    private static final Logger logger = LoggerFactory.getLogger(StopSystemBundleThread.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StopSystemBundleThread.class);
     private final Bundle systemBundle;
 
     StopSystemBundleThread(Bundle systemBundle) {
@@ -94,18 +93,18 @@ class StopSystemBundleThread extends Thread {
         try {
             // wait so that JMX response is received
             Thread.sleep(1000);
-            logger.debug("Stopping system bundle");
+            LOG.debug("Stopping system bundle");
             systemBundle.stop();
         } catch (BundleException e) {
-            logger.warn("Can not stop OSGi server", e);
+            LOG.warn("Can not stop OSGi server", e);
         } catch (InterruptedException e) {
-            logger.warn("Shutdown process interrupted", e);
+            LOG.warn("Shutdown process interrupted", e);
         }
     }
 }
 
 class CallSystemExitThread extends Thread {
-    private static final Logger logger = LoggerFactory.getLogger(CallSystemExitThread.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CallSystemExitThread.class);
     private final long maxWaitTime;
     CallSystemExitThread(long maxWaitTime) {
         super("call-system-exit-daemon");
@@ -128,7 +127,7 @@ class CallSystemExitThread extends Thread {
         try {
             // wait specified time
             Thread.sleep(maxWaitTime);
-            logger.error("Since some threads are still running, server is going to shut down via System.exit(1) !");
+            LOG.error("Since some threads are still running, server is going to shut down via System.exit(1) !");
             // do a thread dump
             ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
             StringBuffer sb = new StringBuffer();
@@ -136,10 +135,10 @@ class CallSystemExitThread extends Thread {
                 sb.append(info);
                 sb.append("\n");
             }
-            logger.warn("Thread dump:{}", sb);
+            LOG.warn("Thread dump:{}", sb);
             System.exit(1);
         } catch (InterruptedException e) {
-            logger.warn("Interrupted, not going to call System.exit(1)");
+            LOG.warn("Interrupted, not going to call System.exit(1)");
         }
     }
 }
index e30ff05..f6ecb44 100644 (file)
           <artifactId>sample-toaster-provider</artifactId>
           <version>${mdsal.version}</version>
         </dependency>
-        <dependency>
-          <groupId>org.opendaylight.controller.thirdparty</groupId>
-          <artifactId>ganymed</artifactId>
-        </dependency>
         <dependency>
           <groupId>org.apache.sshd</groupId>
           <artifactId>sshd-core</artifactId>
         <dependency>
           <groupId>org.opendaylight.yangtools</groupId>
           <artifactId>binding-generator-api</artifactId>
-          <version>${yangtools.version}</version>
         </dependency>
         <dependency>
           <groupId>org.opendaylight.yangtools</groupId>
         <dependency>
           <groupId>org.opendaylight.yangtools</groupId>
           <artifactId>binding-model-api</artifactId>
-          <version>${yangtools.version}</version>
         </dependency>
         <dependency>
           <groupId>org.opendaylight.yangtools</groupId>
index bbb6673..560d8a1 100644 (file)
@@ -7,16 +7,10 @@
  */
 package org.opendaylight.controller.sal.compatibility;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterables;
 
 import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
@@ -46,6 +40,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
@@ -59,6 +56,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.Link;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
@@ -81,21 +80,36 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener {
     private static final Logger LOG = LoggerFactory.getLogger(InventoryAndReadAdapter.class);
     private static final short OPENFLOWV10_TABLE_ID = 0;
+    private static final int SLEEP_FOR_NOTIFICATIONS_MILLIS = 500;
 
     private final InventoryNotificationProvider inventoryNotificationProvider = new InventoryNotificationProvider();
     private final Map<PathArgument,List<PathArgument>> nodeToNodeConnectorsMap = new ConcurrentHashMap<>();
     private List<IPluginOutInventoryService> inventoryPublisher = new CopyOnWriteArrayList<>();
     private List<IPluginOutReadService> statisticsPublisher = new CopyOnWriteArrayList<>();
+    private Cache<String, TransactionNotificationList<? extends TransactionAware>> txCache;
 
     private OpendaylightFlowTableStatisticsService flowTableStatisticsService;
     private OpendaylightPortStatisticsService nodeConnectorStatisticsService;
@@ -171,6 +185,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     public void startAdapter() {
         inventoryNotificationProvider.setDataProviderService(getDataProviderService());
         inventoryNotificationProvider.setInventoryPublisher(getInventoryPublisher());
+        txCache = CacheBuilder.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).maximumSize(10000).build();
         // inventoryNotificationProvider.start();
     }
 
@@ -251,22 +266,97 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
 
     @Override
     public List<FlowOnNode> readAllFlow(final Node node, final boolean cached) {
-        final ArrayList<FlowOnNode> output = new ArrayList<>();
-        final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
-        if (table != null) {
-            final List<Flow> flows = table.getFlow();
-            LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+        final ArrayList<FlowOnNode> ret= new ArrayList<>();
+        if (cached) {
+            final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
+            if (table != null) {
+                final List<Flow> flows = table.getFlow();
+                LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+
+                for (final Flow flow : flows) {
+                    final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
+                    if (statsFromDataStore != null) {
+                        final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
+                        ret.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+                    }
+                }
+            }
+        } else {
+            LOG.debug("readAllFlow cached:{}", cached);
+            GetAllFlowStatisticsFromFlowTableInput input =
+                new GetAllFlowStatisticsFromFlowTableInputBuilder()
+                    .setNode(NodeMapping.toNodeRef(node))
+                    .setTableId(new TableId(OPENFLOWV10_TABLE_ID))
+                    .build();
+
+            Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> future =
+                getFlowStatisticsService().getAllFlowStatisticsFromFlowTable(input);
 
-            for (final Flow flow : flows) {
-                final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
-                if (statsFromDataStore != null) {
-                    final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
-                    output.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+            RpcResult<GetAllFlowStatisticsFromFlowTableOutput> result = null;
+            try {
+                // having a blocking call is fine here, as we need to join
+                // the notifications and return the result
+                result = future.get();
+            } catch (Exception e) {
+               LOG.error("Exception in getAllFlowStatisticsFromFlowTable ", e);
+               return ret;
+            }
+
+            GetAllFlowStatisticsFromFlowTableOutput output = result.getResult();
+            if (output == null) {
+                return ret;
+            }
+
+            TransactionId transactionId = output.getTransactionId();
+            String cacheKey = buildCacheKey(transactionId, NodeMapping.toNodeId(node));
+            LOG.info("readAllFlow transactionId:{} cacheKey:{}", transactionId, cacheKey);
+
+            // insert an entry in tempcache, will get updated when notification is received
+            txCache.put(cacheKey, new TransactionNotificationList<FlowsStatisticsUpdate>(
+                transactionId, node.getNodeIDString()));
+
+            TransactionNotificationList<FlowsStatisticsUpdate> txnList =
+                (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+
+            // this loop would not be infinite as the cache will remove an entry
+            // after defined time if not written to
+            while (txnList != null && !txnList.areAllNotificationsGathered()) {
+                LOG.debug("readAllFlow waiting for notification...");
+                waitForNotification();
+                txnList = (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+            }
+
+            if (txnList == null) {
+                return ret;
+            }
+
+            List<FlowsStatisticsUpdate> notifications = txnList.getNotifications();
+            for (FlowsStatisticsUpdate flowsStatisticsUpdate : notifications) {
+                List<FlowAndStatisticsMapList> flowAndStatisticsMapList = flowsStatisticsUpdate.getFlowAndStatisticsMapList();
+                if (flowAndStatisticsMapList != null) {
+                    for (FlowAndStatisticsMapList flowAndStatistics : flowAndStatisticsMapList) {
+                        final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatistics, node));
+                        ret.add(addFlowStats(it, flowAndStatistics));
+                    }
                 }
             }
         }
+        return ret;
+    }
+
+    private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
+        return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
+    }
 
-        return output;
+    private void waitForNotification() {
+        try {
+            // going for a simple sleep approach,as wait-notify on a monitor would require
+            // us to maintain monitors per txn-node combo
+            Thread.sleep(SLEEP_FOR_NOTIFICATIONS_MILLIS);
+            LOG.trace("statCollector is waking up from a wait stat Response sleep");
+        } catch (final InterruptedException e) {
+            LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
+        }
     }
 
     @Override
@@ -623,6 +713,8 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
         for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) {
             statsPublisher.nodeFlowStatisticsUpdated(aDNode, adsalFlowsStatistics);
         }
+
+        updateTransactionCache(notification, notification.getId(), !notification.isMoreReplies());
     }
 
     /**
@@ -778,4 +870,48 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     private List<PathArgument> removeNodeConnectors(final InstanceIdentifier<? extends Object> nodeIdentifier) {
         return this.nodeToNodeConnectorsMap.remove(Iterables.get(nodeIdentifier.getPathArguments(), 1));
     }
+
+    private <T extends TransactionAware> void updateTransactionCache(T notification, NodeId nodeId, boolean lastNotification) {
+
+        String cacheKey = buildCacheKey(notification.getTransactionId(), nodeId);
+        TransactionNotificationList<T> txnList = (TransactionNotificationList<T>) txCache.getIfPresent(cacheKey);
+        final Optional<TransactionNotificationList<T>> optional = Optional.<TransactionNotificationList<T>>fromNullable(txnList);
+        if (optional.isPresent()) {
+            LOG.info("updateTransactionCache cacheKey:{}, lastNotification:{}, txnList-present:{}", cacheKey, lastNotification, optional.isPresent());
+            TransactionNotificationList<T> txn = optional.get();
+            txn.addNotification(notification);
+            txn.setAllNotificationsGathered(lastNotification);
+        }
+    }
+
+    private class TransactionNotificationList<T extends TransactionAware> {
+        private TransactionId id;
+        private String nId;
+        private List<T> notifications;
+        private boolean allNotificationsGathered;
+
+        public TransactionNotificationList(TransactionId id, String nId) {
+            this.nId = nId;
+            this.id = id;
+            notifications = new ArrayList<T>();
+        }
+
+        public void addNotification(T notification) {
+            notifications.add(notification);
+        }
+
+        public void setAllNotificationsGathered(boolean allNotificationsGathered) {
+            this.allNotificationsGathered = allNotificationsGathered;
+        }
+
+        public boolean areAllNotificationsGathered() {
+            return allNotificationsGathered;
+        }
+
+        public List<T> getNotifications() {
+            return notifications;
+        }
+
+    }
+
 }
index bcb2367..2bc3e60 100644 (file)
@@ -10,11 +10,6 @@ package org.opendaylight.controller.sal.compatibility;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
-import java.math.BigInteger;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.regex.Pattern;
 import org.opendaylight.controller.sal.common.util.Arguments;
 import org.opendaylight.controller.sal.core.AdvertisedBandwidth;
 import org.opendaylight.controller.sal.core.Bandwidth;
@@ -65,6 +60,12 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigInteger;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
 public final class NodeMapping {
 
     private static final Logger LOG = LoggerFactory
@@ -167,7 +168,7 @@ public final class NodeMapping {
      * @param aDNode
      * @return
      */
-    private static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
+    public static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
         String targetPrefix = null;
         if (NodeIDType.OPENFLOW.equals(aDNode.getType())) {
                 targetPrefix = OPENFLOW_ID_PREFIX;
index 04df778..97b912e 100644 (file)
@@ -100,7 +100,7 @@ public class ExampleActor extends RaftActor {
         try {
             bs = fromObject(state);
         } catch (Exception e) {
-            LOG.error("Exception in creating snapshot", e);
+            LOG.error(e, "Exception in creating snapshot");
         }
         getSelf().tell(new CaptureSnapshotReply(bs), null);
     }
@@ -110,10 +110,10 @@ public class ExampleActor extends RaftActor {
         try {
             state.putAll((HashMap) toObject(snapshot));
         } catch (Exception e) {
-           LOG.error("Exception in applying snapshot", e);
+           LOG.error(e, "Exception in applying snapshot");
         }
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+            LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
         }
     }
 
index 64fa749..66a46ef 100644 (file)
@@ -29,9 +29,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
-import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -159,7 +157,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
-        LOG.debug("SnapshotOffer called..");
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("SnapshotOffer called..");
+        }
 
         initRecoveryTimer();
 
@@ -250,7 +250,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
             replicatedLog.snapshotTerm, replicatedLog.size());
 
-        currentBehavior = switchBehavior(RaftState.Follower);
+        currentBehavior = new Follower(context);
         onStateChanged();
     }
 
@@ -355,14 +355,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("onReceiveCommand: message:" + message.getClass());
+                    LOG.debug("onReceiveCommand: message: {}", message.getClass());
                 }
             }
 
-            RaftState state =
-                currentBehavior.handleMessage(getSender(), message);
             RaftActorBehavior oldBehavior = currentBehavior;
-            currentBehavior = switchBehavior(state);
+            currentBehavior = currentBehavior.handleMessage(getSender(), message);
+
             if(oldBehavior != currentBehavior){
                 onStateChanged();
             }
@@ -569,38 +568,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
-    private RaftActorBehavior switchBehavior(RaftState state) {
-        if (currentBehavior != null) {
-            if (currentBehavior.state() == state) {
-                return currentBehavior;
-            }
-            LOG.info("Switching from state " + currentBehavior.state() + " to "
-                + state);
-
-            try {
-                currentBehavior.close();
-            } catch (Exception e) {
-                LOG.error(e,
-                    "Failed to close behavior : " + currentBehavior.state());
-            }
-
-        } else {
-            LOG.info("Switching behavior to " + state);
-        }
-        RaftActorBehavior behavior = null;
-        if (state == RaftState.Candidate) {
-            behavior = new Candidate(context);
-        } else if (state == RaftState.Follower) {
-            behavior = new Follower(context);
-        } else {
-            behavior = new Leader(context);
-        }
-
-
-
-        return behavior;
-    }
-
     private void trimPersistentData(long sequenceNumber) {
         // Trim akka snapshots
         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
@@ -622,8 +589,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
         String peerAddress = context.getPeerAddress(leaderId);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
-                + peerAddress);
+            LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
+                    leaderId, peerAddress);
         }
 
         return peerAddress;
@@ -697,8 +664,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
         public void appendAndPersist(final ActorRef clientActor,
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
-            context.getLogger().debug(
-                "Append log entry and persist {} ", replicatedLogEntry);
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+            }
+
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
             journal.add(replicatedLogEntry);
 
index b1560a5..eed74bb 100644 (file)
@@ -10,9 +10,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
+import akka.event.LoggingAdapter;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -44,6 +44,11 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected final RaftActorContext context;
 
+    /**
+     *
+     */
+    protected final LoggingAdapter LOG;
+
     /**
      *
      */
@@ -57,6 +62,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
     protected AbstractRaftActorBehavior(RaftActorContext context) {
         this.context = context;
+        this.LOG = context.getLogger();
     }
 
     /**
@@ -71,7 +77,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param appendEntries  The AppendEntries message
      * @return
      */
-    protected abstract RaftState handleAppendEntries(ActorRef sender,
+    protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries);
 
 
@@ -83,19 +89,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param appendEntries
      * @return
      */
-    protected RaftState appendEntries(ActorRef sender,
+    protected RaftActorBehavior appendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
         // 1. Reply false if term < currentTerm (§5.1)
         if (appendEntries.getTerm() < currentTerm()) {
-            context.getLogger().debug(
-                "Cannot append entries because sender term " + appendEntries
-                    .getTerm() + " is less than " + currentTerm());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Cannot append entries because sender term {} is less than {}",
+                        appendEntries.getTerm(), currentTerm());
+            }
+
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
             );
-            return state();
+            return this;
         }
 
 
@@ -114,7 +122,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param appendEntriesReply The AppendEntriesReply message
      * @return
      */
-    protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
+    protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply);
 
     /**
@@ -125,11 +133,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param requestVote
      * @return
      */
-    protected RaftState requestVote(ActorRef sender,
+    protected RaftActorBehavior requestVote(ActorRef sender,
         RequestVote requestVote) {
 
-
-        context.getLogger().debug(requestVote.toString());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(requestVote.toString());
+        }
 
         boolean grantVote = false;
 
@@ -167,7 +176,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
         sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
 
-        return state();
+        return this;
     }
 
     /**
@@ -182,7 +191,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param requestVoteReply The RequestVoteReply message
      * @return
      */
-    protected abstract RaftState handleRequestVoteReply(ActorRef sender,
+    protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply);
 
     /**
@@ -341,12 +350,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             } else {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
-                context.getLogger().warning(
-                    "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+                LOG.warning(
+                        "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index);
                 break;
             }
         }
-        context.getLogger().debug("Setting last applied to {}", newLastApplied);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Setting last applied to {}", newLastApplied);
+        }
         context.setLastApplied(newLastApplied);
 
         // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
@@ -361,7 +372,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     @Override
-    public RaftState handleMessage(ActorRef sender, Object message) {
+    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
         if (message instanceof AppendEntries) {
             return appendEntries(sender, (AppendEntries) message);
         } else if (message instanceof AppendEntriesReply) {
@@ -371,10 +382,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         } else if (message instanceof RequestVoteReply) {
             return handleRequestVoteReply(sender, (RequestVoteReply) message);
         }
-        return state();
+        return this;
     }
 
     @Override public String getLeaderId() {
         return leaderId;
     }
+
+    protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
+        LOG.info("Switching from behavior {} to {}", this.state(), behavior.state());
+        try {
+            close();
+        } catch (Exception e) {
+            LOG.error(e, "Failed to close behavior : {}", this.state());
+        }
+
+        return behavior;
+    }
 }
index bb1927e..4a3e2c5 100644 (file)
@@ -52,7 +52,9 @@ public class Candidate extends AbstractRaftActorBehavior {
 
         peers = context.getPeerAddresses().keySet();
 
-        context.getLogger().debug("Election:Candidate has following peers:"+ peers);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Election:Candidate has following peers: {}", peers);
+        }
 
         if(peers.size() > 0) {
             // Votes are required from a majority of the peers including self.
@@ -78,21 +80,23 @@ public class Candidate extends AbstractRaftActorBehavior {
         scheduleElection(electionDuration());
     }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
-        context.getLogger().debug(appendEntries.toString());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(appendEntries.toString());
+        }
 
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
 
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
 
         if (requestVoteReply.isVoteGranted()) {
@@ -100,10 +104,10 @@ public class Candidate extends AbstractRaftActorBehavior {
         }
 
         if (voteCount >= votesRequired) {
-            return RaftState.Leader;
+            return switchBehavior(new Leader(context));
         }
 
-        return state();
+        return this;
     }
 
     @Override public RaftState state() {
@@ -111,7 +115,7 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+    public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
 
         Object message = fromSerializableMessage(originalMessage);
 
@@ -119,14 +123,17 @@ public class Candidate extends AbstractRaftActorBehavior {
 
             RaftRPC rpc = (RaftRPC) message;
 
-            context.getLogger().debug("RaftRPC message received {} my term is {}", rpc.toString(), context.getTermInformation().getCurrentTerm());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm());
+            }
 
             // If RPC request or response contains term T > currentTerm:
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-                return RaftState.Follower;
+
+                return switchBehavior(new Follower(context));
             }
         }
 
@@ -137,11 +144,12 @@ public class Candidate extends AbstractRaftActorBehavior {
                 // ourselves the leader. This gives enough time for a leader
                 // who we do not know about (as a peer)
                 // to send a message to the candidate
-                return RaftState.Leader;
+
+                return switchBehavior(new Leader(context));
             }
             startNewTerm();
             scheduleElection(electionDuration());
-            return state();
+            return this;
         }
 
         return super.handleMessage(sender, message);
@@ -159,7 +167,9 @@ public class Candidate extends AbstractRaftActorBehavior {
         context.getTermInformation().updateAndPersist(currentTerm + 1,
             context.getId());
 
-        context.getLogger().debug("Starting new term " + (currentTerm + 1));
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Starting new term {}", (currentTerm + 1));
+        }
 
         // Request for a vote
         // TODO: Retry request for vote if replies do not arrive in a reasonable
index 1cfdf9d..7ada8b3 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.event.LoggingAdapter;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -39,17 +38,13 @@ import java.util.ArrayList;
 public class Follower extends AbstractRaftActorBehavior {
     private ByteString snapshotChunksCollected = ByteString.EMPTY;
 
-    private final LoggingAdapter LOG;
-
     public Follower(RaftActorContext context) {
         super(context);
 
-        LOG = context.getLogger();
-
         scheduleElection(electionDuration());
     }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
         if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
@@ -133,15 +128,14 @@ public class Follower extends AbstractRaftActorBehavior {
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
             );
-            return state();
+            return this;
         }
 
         if (appendEntries.getEntries() != null
             && appendEntries.getEntries().size() > 0) {
             if(LOG.isDebugEnabled()) {
                 LOG.debug(
-                    "Number of entries to be appended = " + appendEntries
-                        .getEntries().size()
+                    "Number of entries to be appended = {}", appendEntries.getEntries().size()
                 );
             }
 
@@ -168,8 +162,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
                     if(LOG.isDebugEnabled()) {
                         LOG.debug(
-                            "Removing entries from log starting at "
-                                + matchEntry.getIndex()
+                            "Removing entries from log starting at {}", matchEntry.getIndex()
                         );
                     }
 
@@ -181,9 +174,7 @@ public class Follower extends AbstractRaftActorBehavior {
             }
 
             if(LOG.isDebugEnabled()) {
-                context.getLogger().debug(
-                    "After cleanup entries to be added from = " + (addEntriesFrom
-                        + lastIndex())
+                LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
                 );
             }
 
@@ -191,17 +182,14 @@ public class Follower extends AbstractRaftActorBehavior {
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
 
-                context.getLogger().info(
-                    "Append entry to log " + appendEntries.getEntries().get(
-                        i).getData()
-                        .toString()
-                );
-                context.getReplicatedLog()
-                    .appendAndPersist(appendEntries.getEntries().get(i));
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+                }
+                context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
             }
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Log size is now " + context.getReplicatedLog().size());
+                LOG.debug("Log size is now {}", context.getReplicatedLog().size());
             }
         }
 
@@ -216,7 +204,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (prevCommitIndex != context.getCommitIndex()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Commit index set to " + context.getCommitIndex());
+                LOG.debug("Commit index set to {}", context.getCommitIndex());
             }
         }
 
@@ -239,24 +227,24 @@ public class Follower extends AbstractRaftActorBehavior {
         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
             lastIndex(), lastTerm()), actor());
 
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
-        return state();
+        return this;
     }
 
     @Override public RaftState state() {
         return RaftState.Follower;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+    @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
 
         Object message = fromSerializableMessage(originalMessage);
 
@@ -271,7 +259,7 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         if (message instanceof ElectionTimeout) {
-            return RaftState.Candidate;
+            return switchBehavior(new Candidate(context));
 
         } else if (message instanceof InstallSnapshot) {
             InstallSnapshot installSnapshot = (InstallSnapshot) message;
@@ -297,8 +285,10 @@ public class Follower extends AbstractRaftActorBehavior {
                 // this is the last chunk, create a snapshot object and apply
 
                 snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
-                context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
-                    snapshotChunksCollected.size());
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
+                            snapshotChunksCollected.size());
+                }
 
                 Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
                     new ArrayList<ReplicatedLogEntry>(),
@@ -324,7 +314,7 @@ public class Follower extends AbstractRaftActorBehavior {
                 true), actor());
 
         } catch (Exception e) {
-            context.getLogger().error("Exception in InstallSnapshot of follower", e);
+            LOG.error(e, "Exception in InstallSnapshot of follower:");
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                 installSnapshot.getChunkIndex(), false), actor());
index ff8a225..9edba85 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
-import akka.event.LoggingAdapter;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
@@ -81,13 +80,9 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private final int minReplicationCount;
 
-    private final LoggingAdapter LOG;
-
     public Leader(RaftActorContext context) {
         super(context);
 
-        LOG = context.getLogger();
-
         followers = context.getPeerAddresses().keySet();
 
         for (String followerId : followers) {
@@ -100,7 +95,7 @@ public class Leader extends AbstractRaftActorBehavior {
         }
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Leader has following peers:" + followers);
+            LOG.debug("Election:Leader has following peers: {}", followers);
         }
 
         if (followers.size() > 0) {
@@ -123,17 +118,17 @@ public class Leader extends AbstractRaftActorBehavior {
 
     }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
         if(LOG.isDebugEnabled()) {
             LOG.debug(appendEntries.toString());
         }
 
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
 
         if(! appendEntriesReply.isSuccess()) {
@@ -149,7 +144,7 @@ public class Leader extends AbstractRaftActorBehavior {
 
         if(followerLogInformation == null){
             LOG.error("Unknown follower {}", followerId);
-            return state();
+            return this;
         }
 
         if (appendEntriesReply.isSuccess()) {
@@ -199,7 +194,7 @@ public class Leader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
-        return state();
+        return this;
     }
 
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
@@ -222,16 +217,16 @@ public class Leader extends AbstractRaftActorBehavior {
         return null;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
-        return state();
+        return this;
     }
 
     @Override public RaftState state() {
         return RaftState.Leader;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+    @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
         Object message = fromSerializableMessage(originalMessage);
@@ -243,13 +238,15 @@ public class Leader extends AbstractRaftActorBehavior {
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-                return RaftState.Follower;
+
+                return switchBehavior(new Follower(context));
             }
         }
 
         try {
             if (message instanceof SendHeartBeat) {
-                return sendHeartBeat();
+                sendHeartBeat();
+                return this;
             } else if(message instanceof SendInstallSnapshot) {
                 installSnapshotIfNeeded();
             } else if (message instanceof Replicate) {
@@ -321,7 +318,7 @@ public class Leader extends AbstractRaftActorBehavior {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Replicate message " + logIndex);
+            LOG.debug("Replicate message {}", logIndex);
         }
 
         // Create a tracker entry we will use this later to notify the
@@ -445,7 +442,7 @@ public class Leader extends AbstractRaftActorBehavior {
                 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
                 mapFollowerToSnapshot.get(followerId).getTotalChunks());
         } catch (IOException e) {
-            LOG.error("InstallSnapshot failed for Leader.", e);
+            LOG.error(e, "InstallSnapshot failed for Leader.");
         }
     }
 
@@ -467,11 +464,10 @@ public class Leader extends AbstractRaftActorBehavior {
         return nextChunk;
     }
 
-    private RaftState sendHeartBeat() {
+    private void sendHeartBeat() {
         if (followers.size() > 0) {
             sendAppendEntries();
         }
-        return state();
     }
 
     private void stopHeartBeat() {
index ca2d916..064cd8b 100644 (file)
@@ -25,17 +25,18 @@ import org.opendaylight.controller.cluster.raft.RaftState;
  * differently.
  */
 public interface RaftActorBehavior extends AutoCloseable{
+
     /**
      * Handle a message. If the processing of the message warrants a state
-     * change then a new state should be returned otherwise this method should
-     * return the state for the current behavior.
+     * change then a new behavior should be returned otherwise this method should
+     * return the current behavior.
      *
      * @param sender The sender of the message
      * @param message A message that needs to be processed
      *
-     * @return The new state or self (this)
+     * @return The new behavior or current behavior
      */
-    RaftState handleMessage(ActorRef sender, Object message);
+    RaftActorBehavior handleMessage(ActorRef sender, Object message);
 
     /**
      * The state associated with a given behavior
index 22f3743..c15c919 100644 (file)
@@ -181,7 +181,7 @@ public class RaftActorTest extends AbstractActorTest {
                         return true;
                     }
                 }.from(raftActor.path().toString())
-                    .message("Switching from state Candidate to Leader")
+                    .message("Switching from behavior Candidate to Leader")
                     .occurrences(1).exec();
 
 
index 8068dfb..3893018 100644 (file)
@@ -7,7 +7,6 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -22,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
@@ -79,12 +79,12 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
 
             // Also expect an AppendEntriesReply to be sent where success is false
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
@@ -145,12 +145,12 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                 }
 
                 // Send an unknown message so that the state of the RaftActor remains unchanged
-                RaftState expected = behavior.handleMessage(getRef(), "unknown");
+                RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-                RaftState raftState =
+                RaftActorBehavior raftBehavior =
                     behavior.handleMessage(getRef(), appendEntries);
 
-                assertEquals(expected, raftState);
+                assertEquals(expected, raftBehavior);
 
                 assertEquals(1, log.size());
 
@@ -174,11 +174,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                     RaftActorBehavior behavior = createBehavior(
                         createActorContext(behaviorActor));
 
-                    RaftState raftState = behavior.handleMessage(getTestActor(),
+                    RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
                         new RequestVote(1000, "test", 10000, 999));
 
-                    if(behavior.state() != RaftState.Follower){
-                        assertEquals(RaftState.Follower, raftState);
+                    if(!(behavior instanceof Follower)){
+                        assertTrue(raftBehavior instanceof Follower);
                     } else {
 
                         final Boolean out =
@@ -228,11 +228,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
                     RaftActorBehavior behavior = createBehavior(actorContext);
 
-                    RaftState raftState = behavior.handleMessage(getTestActor(),
+                    RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
                         new RequestVote(1000, "test", 10000, 999));
 
-                    if(behavior.state() != RaftState.Follower){
-                        assertEquals(RaftState.Follower, raftState);
+                    if(!(behavior instanceof Follower)){
+                        assertTrue(raftBehavior instanceof Follower);
                     } else {
                         final Boolean out =
                             new ExpectMsg<Boolean>(duration("1 seconds"),
@@ -309,10 +309,10 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         setLastLogEntry(
             (MockRaftActorContext) actorContext, 0, 0, p);
 
-        RaftState raftState = createBehavior(actorContext)
+        RaftActorBehavior raftBehavior = createBehavior(actorContext)
             .handleMessage(actorRef, rpc);
 
-        assertEquals(RaftState.Follower, raftState);
+        assertTrue(raftBehavior instanceof Follower);
     }
 
     protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
index d478b17..a8d47e2 100644 (file)
@@ -9,7 +9,6 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -109,10 +108,10 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         Candidate candidate =
             new Candidate(raftActorContext);
 
-        RaftState raftState =
+        RaftActorBehavior raftBehavior =
             candidate.handleMessage(candidateActor, new ElectionTimeout());
 
-        Assert.assertEquals(RaftState.Leader, raftState);
+        Assert.assertTrue(raftBehavior instanceof Leader);
     }
 
     @Test
@@ -123,10 +122,10 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         Candidate candidate =
             new Candidate(raftActorContext);
 
-        RaftState raftState =
+        RaftActorBehavior raftBehavior =
             candidate.handleMessage(candidateActor, new ElectionTimeout());
 
-        Assert.assertEquals(RaftState.Candidate, raftState);
+        Assert.assertTrue(raftBehavior instanceof Candidate);
     }
 
     @Test
@@ -137,9 +136,9 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         Candidate candidate =
             new Candidate(raftActorContext);
 
-        RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+        RaftActorBehavior behaviorOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
 
-        Assert.assertEquals(RaftState.Leader, stateOnFirstVote);
+        Assert.assertTrue(behaviorOnFirstVote instanceof Leader);
 
     }
 
@@ -151,12 +150,12 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         Candidate candidate =
             new Candidate(raftActorContext);
 
-        RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+        RaftActorBehavior behaviorOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
 
-        RaftState stateOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true));
+        RaftActorBehavior behaviorOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true));
 
-        Assert.assertEquals(RaftState.Candidate, stateOnFirstVote);
-        Assert.assertEquals(RaftState.Leader, stateOnSecondVote);
+        Assert.assertTrue(behaviorOnFirstVote instanceof Candidate);
+        Assert.assertTrue(behaviorOnSecondVote instanceof Leader);
 
     }
 
index a72a7c4..edeab11 100644 (file)
@@ -9,7 +9,6 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -85,10 +84,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         Follower follower =
             new Follower(raftActorContext);
 
-        RaftState raftState =
+        RaftActorBehavior raftBehavior =
             follower.handleMessage(followerActor, new ElectionTimeout());
 
-        Assert.assertEquals(RaftState.Candidate, raftState);
+        Assert.assertTrue(raftBehavior instanceof Candidate);
     }
 
     @Test
@@ -187,7 +186,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             AppendEntries appendEntries =
                 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
 
             assertEquals(101L, context.getLastApplied());
@@ -226,12 +225,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
 
             // Also expect an AppendEntriesReply to be sent where success is false
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
@@ -302,12 +301,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
             assertEquals(5, log.last().getIndex() + 1);
             assertNotNull(log.get(3));
             assertNotNull(log.get(4));
@@ -382,12 +381,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
 
             // The entry at index 2 will be found out-of-sync with the leader
             // and will be removed
index 19af647..48543d7 100644 (file)
@@ -12,7 +12,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -54,8 +53,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             // handle message should return the Leader state when it receives an
             // unknown message
-            RaftState state = leader.handleMessage(senderActor, "foo");
-            Assert.assertEquals(RaftState.Leader, state);
+            RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
+            Assert.assertTrue(behavior instanceof Leader);
         }};
     }
 
@@ -125,7 +124,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
-                    RaftState raftState = leader
+                    RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, null,
                             new MockRaftActorContext.MockReplicatedLogEntry(1,
                                 100,
@@ -133,7 +132,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         ));
 
                     // State should not change
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
@@ -179,11 +178,11 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                             .build());
 
                     Leader leader = new Leader(actorContext);
-                    RaftState raftState = leader
+                    RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
 
                     // State should not change
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     assertEquals(1, actorContext.getCommitIndex());
 
@@ -258,10 +257,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                             new MockRaftActorContext.MockPayload("D"));
 
                     // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
-                    RaftState raftState = leader.handleMessage(
+                    RaftActorBehavior raftBehavior = leader.handleMessage(
                         senderActor, new Replicate(null, "state-id", entry));
 
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
                     Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
@@ -333,9 +332,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
                             new MockRaftActorContext.MockPayload("D"));
 
-                    RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
+                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
 
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     // check if installsnapshot gets called with the correct values.
                     final String out =
@@ -419,11 +418,11 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     //clears leaders log
                     actorContext.getReplicatedLog().removeFrom(0);
 
-                    RaftState raftState = leader.handleMessage(senderActor,
+                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
                         new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
                             leader.getFollowerToSnapshot().getChunkIndex(), true));
 
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     assertEquals(leader.mapFollowerToSnapshot.size(), 0);
                     assertEquals(leader.followerToLog.size(), 1);
index efd35cc..f7313f4 100644 (file)
@@ -76,7 +76,6 @@ public class TestHelper {
                 mavenBundle("org.apache.sshd", "sshd-core").versionAsInProject(), //
                 mavenBundle("org.openexi", "nagasena").versionAsInProject(), //
                 mavenBundle("org.openexi", "nagasena-rta").versionAsInProject(), //
-                mavenBundle(CONTROLLER + ".thirdparty", "ganymed").versionAsInProject(), //
                 mavenBundle(CONTROLLER, "netconf-mapping-api").versionAsInProject(), //
 
                 mavenBundle(CONTROLLER, "config-persister-impl").versionAsInProject(), //
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/NodeToNormalizedNodeBuilder.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/NodeToNormalizedNodeBuilder.java
deleted file mode 100644 (file)
index 03d632b..0000000
+++ /dev/null
@@ -1,856 +0,0 @@
-/*
- *
- *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- *  This program and the accompanying materials are made available under the
- *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
- *  and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- */
-
-package org.opendaylight.controller.cluster.datastore.node;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import org.opendaylight.controller.cluster.datastore.node.utils.NodeIdentifierFactory;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
-import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeContainerBuilder;
-import org.opendaylight.yangtools.yang.model.api.AugmentationSchema;
-import org.opendaylight.yangtools.yang.model.api.AugmentationTarget;
-import org.opendaylight.yangtools.yang.model.api.ChoiceCaseNode;
-import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
-import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * NormalizedNodeBuilder is a builder that walks through a tree like structure and constructs a
- * NormalizedNode from it.
- * <p/>
- * A large part of this code has been copied over from a similar class in sal-common-impl which was
- * originally supposed to convert a CompositeNode to NormalizedNode
- *
- * @param <T>
- */
-public abstract class NodeToNormalizedNodeBuilder<T extends PathArgument>
-    implements Identifiable<T> {
-
-    private final T identifier;
-
-    protected static final Logger logger = LoggerFactory
-        .getLogger(NodeToNormalizedNodeBuilder.class);
-
-    @Override
-    public T getIdentifier() {
-        return identifier;
-    }
-
-    ;
-
-    protected NodeToNormalizedNodeBuilder(final T identifier) {
-        super();
-        this.identifier = identifier;
-
-    }
-
-    /**
-     * @return Should return true if the node that this operation corresponds to is a mixin
-     */
-    public boolean isMixin() {
-        return false;
-    }
-
-
-    /**
-     * @return Should return true if the node that this operation corresponds to has a 'key'
-     * associated with it. This is typically true for a list-item or leaf-list entry in yang
-     */
-    public boolean isKeyedEntry() {
-        return false;
-    }
-
-    protected Set<QName> getQNameIdentifiers() {
-        return Collections.singleton(identifier.getNodeType());
-    }
-
-    public abstract NodeToNormalizedNodeBuilder<?> getChild(
-        final PathArgument child);
-
-    public abstract NodeToNormalizedNodeBuilder<?> getChild(QName child);
-
-    public abstract NormalizedNode<?, ?> normalize(QName nodeType, Node node);
-
-
-
-    private static abstract class SimpleTypeNormalization<T extends PathArgument>
-        extends NodeToNormalizedNodeBuilder<T> {
-
-        protected SimpleTypeNormalization(final T identifier) {
-            super(identifier);
-        }
-
-        @Override
-        public NormalizedNode<?, ?> normalize(final QName nodeType,
-            final Node node) {
-            checkArgument(node != null);
-            return normalizeImpl(nodeType, node);
-        }
-
-        protected abstract NormalizedNode<?, ?> normalizeImpl(QName nodeType,
-            Node node);
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(
-            final PathArgument child) {
-            return null;
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(final QName child) {
-            return null;
-        }
-
-        @Override
-        public NormalizedNode<?, ?> createDefault(
-            final PathArgument currentArg) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-
-    }
-
-
-    private static final class LeafNormalization extends
-        SimpleTypeNormalization<NodeIdentifier> {
-
-        private final LeafSchemaNode schema;
-
-        protected LeafNormalization(final LeafSchemaNode schema, final NodeIdentifier identifier) {
-            super(identifier);
-            this.schema = schema;
-        }
-
-        @Override
-        protected NormalizedNode<?, ?> normalizeImpl(final QName nodeType,
-            final Node node) {
-            Object value = NodeValueCodec.toTypeSafeValue(this.schema, this.schema.getType(), node);
-            return ImmutableNodes.leafNode(nodeType, value);
-
-        }
-
-    }
-
-
-    private static final class LeafListEntryNormalization extends
-        SimpleTypeNormalization<NodeWithValue> {
-
-        private final LeafListSchemaNode schema;
-
-        public LeafListEntryNormalization(final LeafListSchemaNode potential) {
-            super(new NodeWithValue(potential.getQName(), null));
-            this.schema = potential;
-        }
-
-        @Override
-        protected NormalizedNode<?, ?> normalizeImpl(final QName nodeType,
-            final Node node) {
-            final Object data = node.getValue();
-            if (data == null) {
-                Preconditions.checkArgument(false,
-                    "No data available in leaf list entry for " + nodeType);
-            }
-
-            Object value = NodeValueCodec.toTypeSafeValue(this.schema, this.schema.getType(), node);
-
-            NodeWithValue nodeId = new NodeWithValue(nodeType, value);
-            return Builders.leafSetEntryBuilder().withNodeIdentifier(nodeId)
-                .withValue(value).build();
-        }
-
-
-        @Override
-        public boolean isKeyedEntry() {
-            return true;
-        }
-    }
-
-
-    private static abstract class NodeToNormalizationNodeOperation<T extends PathArgument>
-        extends NodeToNormalizedNodeBuilder<T> {
-
-        protected NodeToNormalizationNodeOperation(final T identifier) {
-            super(identifier);
-        }
-
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        @Override
-        public final NormalizedNodeContainer<?, ?, ?> normalize(
-            final QName nodeType, final Node node) {
-            checkArgument(node != null);
-
-            if (!node.getType().equals(AugmentationNode.class.getSimpleName())
-                && !node.getType().equals(ContainerNode.class.getSimpleName())
-                && !node.getType().equals(MapNode.class.getSimpleName())) {
-                checkArgument(nodeType != null);
-            }
-
-            NormalizedNodeContainerBuilder builder = createBuilder(node);
-
-            Set<NodeToNormalizedNodeBuilder<?>> usedMixins = new HashSet<>();
-
-            logNode(node);
-
-            if (node.getChildCount() == 0 && (
-                node.getType().equals(LeafSetEntryNode.class.getSimpleName())
-                    || node.getType().equals(LeafNode.class.getSimpleName()))) {
-                PathArgument childPathArgument =
-                    NodeIdentifierFactory.getArgument(node.getPath());
-
-                final NormalizedNode child;
-                if (childPathArgument instanceof NodeWithValue) {
-                    final NodeWithValue nodeWithValue =
-                        new NodeWithValue(childPathArgument.getNodeType(),
-                            node.getValue());
-                    child =
-                        Builders.leafSetEntryBuilder()
-                            .withNodeIdentifier(nodeWithValue)
-                            .withValue(node.getValue()).build();
-                } else {
-                    child =
-                        ImmutableNodes.leafNode(childPathArgument.getNodeType(),
-                            node.getValue());
-                }
-                builder.addChild(child);
-            }
-
-            final List<Node> children = node.getChildList();
-            for (Node nodeChild : children) {
-
-                PathArgument childPathArgument =
-                    NodeIdentifierFactory.getArgument(nodeChild.getPath());
-
-                QName childNodeType = null;
-                NodeToNormalizedNodeBuilder childOp = null;
-
-                if (childPathArgument instanceof AugmentationIdentifier) {
-                    childOp = getChild(childPathArgument);
-                    checkArgument(childOp instanceof AugmentationNormalization, childPathArgument);
-                } else {
-                    childNodeType = childPathArgument.getNodeType();
-                    childOp = getChild(childNodeType);
-                }
-                // We skip unknown nodes if this node is mixin since
-                // it's nodes and parent nodes are interleaved
-                if (childOp == null && isMixin()) {
-                    continue;
-                } else if (childOp == null) {
-                    logger.error(
-                        "childOp is null and this operation is not a mixin : this = {}",
-                        this.toString());
-                }
-
-                checkArgument(childOp != null,
-                    "Node %s is not allowed inside %s",
-                    childNodeType, getIdentifier());
-
-                if (childOp.isMixin()) {
-                    if (usedMixins.contains(childOp)) {
-                        // We already run / processed that mixin, so to avoid
-                        // duplicate we are
-                        // skipping next nodes.
-                        continue;
-                    }
-                    // builder.addChild(childOp.normalize(nodeType, treeCacheNode));
-                    final NormalizedNode childNode =
-                        childOp.normalize(childNodeType, nodeChild);
-                    if (childNode != null)
-                        builder.addChild(childNode);
-                    usedMixins.add(childOp);
-                } else {
-                    final NormalizedNode childNode =
-                        childOp.normalize(childNodeType, nodeChild);
-                    if (childNode != null)
-                        builder.addChild(childNode);
-                }
-            }
-
-
-            try {
-                return (NormalizedNodeContainer<?, ?, ?>) builder.build();
-            } catch (Exception e) {
-                return null;
-            }
-
-        }
-
-        private void logNode(Node node) {
-            //let us find out the type of the node
-            logger.debug("We got a {} , with identifier {} with {} children",
-                node.getType(), node.getPath(),
-                node.getChildList());
-        }
-
-        @SuppressWarnings("rawtypes")
-        protected abstract NormalizedNodeContainerBuilder createBuilder(
-            final Node node);
-
-    }
-
-
-    private static abstract class DataContainerNormalizationOperation<T extends PathArgument>
-        extends NodeToNormalizationNodeOperation<T> {
-
-        private final DataNodeContainer schema;
-        private final Map<QName, NodeToNormalizedNodeBuilder<?>> byQName;
-        private final Map<PathArgument, NodeToNormalizedNodeBuilder<?>> byArg;
-
-        protected DataContainerNormalizationOperation(final T identifier,
-            final DataNodeContainer schema) {
-            super(identifier);
-            this.schema = schema;
-            this.byArg = new ConcurrentHashMap<>();
-            this.byQName = new ConcurrentHashMap<>();
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(
-            final PathArgument child) {
-            NodeToNormalizedNodeBuilder<?> potential = byArg.get(child);
-            if (potential != null) {
-                return potential;
-            }
-            potential = fromSchema(schema, child);
-            return register(potential);
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(final QName child) {
-            if (child == null) {
-                return null;
-            }
-
-            NodeToNormalizedNodeBuilder<?> potential = byQName.get(child);
-            if (potential != null) {
-                return potential;
-            }
-            potential = fromSchemaAndPathArgument(schema, child);
-            return register(potential);
-        }
-
-        private NodeToNormalizedNodeBuilder<?> register(
-            final NodeToNormalizedNodeBuilder<?> potential) {
-            if (potential != null) {
-                byArg.put(potential.getIdentifier(), potential);
-                for (QName qName : potential.getQNameIdentifiers()) {
-                    byQName.put(qName, potential);
-                }
-            }
-            return potential;
-        }
-
-    }
-
-
-    private static final class ListItemNormalization extends
-        DataContainerNormalizationOperation<NodeIdentifierWithPredicates> {
-
-        private final List<QName> keyDefinition;
-        private final ListSchemaNode schemaNode;
-
-        protected ListItemNormalization(
-            final NodeIdentifierWithPredicates identifier,
-            final ListSchemaNode schema) {
-            super(identifier, schema);
-            this.schemaNode = schema;
-            keyDefinition = schema.getKeyDefinition();
-        }
-
-        @Override
-        protected NormalizedNodeContainerBuilder createBuilder(
-            final Node node) {
-            NodeIdentifierWithPredicates nodeIdentifierWithPredicates =
-                (NodeIdentifierWithPredicates) NodeIdentifierFactory
-                    .createPathArgument(node
-                        .getPath(), schemaNode);
-            return Builders.mapEntryBuilder()
-                .withNodeIdentifier(
-                    nodeIdentifierWithPredicates
-            );
-        }
-
-        @Override
-        public NormalizedNode<?, ?> createDefault(
-            final PathArgument currentArg) {
-            DataContainerNodeAttrBuilder<NodeIdentifierWithPredicates, MapEntryNode>
-                builder =
-                Builders.mapEntryBuilder().withNodeIdentifier(
-                    (NodeIdentifierWithPredicates) currentArg);
-            for (Entry<QName, Object> keyValue : ((NodeIdentifierWithPredicates) currentArg)
-                .getKeyValues().entrySet()) {
-                if (keyValue.getValue() == null) {
-                    throw new NullPointerException(
-                        "Null value found for path : "
-                            + currentArg);
-                }
-                builder.addChild(Builders.leafBuilder()
-                    //
-                    .withNodeIdentifier(new NodeIdentifier(keyValue.getKey()))
-                    .withValue(keyValue.getValue()).build());
-            }
-            return builder.build();
-        }
-
-
-        @Override
-        public boolean isKeyedEntry() {
-            return true;
-        }
-    }
-
-
-    private static final class ContainerNormalization extends
-        DataContainerNormalizationOperation<NodeIdentifier> {
-
-        protected ContainerNormalization(final ContainerSchemaNode schema) {
-            super(new NodeIdentifier(schema.getQName()), schema);
-        }
-
-        @Override
-        protected NormalizedNodeContainerBuilder createBuilder(
-            final Node node) {
-            return Builders.containerBuilder()
-                .withNodeIdentifier(getIdentifier());
-        }
-
-        @Override
-        public NormalizedNode<?, ?> createDefault(
-            final PathArgument currentArg) {
-            return Builders.containerBuilder()
-                .withNodeIdentifier((NodeIdentifier) currentArg).build();
-        }
-
-    }
-
-
-    private static abstract class MixinNormalizationOp<T extends PathArgument>
-        extends NodeToNormalizationNodeOperation<T> {
-
-        protected MixinNormalizationOp(final T identifier) {
-            super(identifier);
-        }
-
-        @Override
-        public final boolean isMixin() {
-            return true;
-        }
-
-    }
-
-
-    private static final class LeafListMixinNormalization extends
-        MixinNormalizationOp<NodeIdentifier> {
-
-        private final NodeToNormalizedNodeBuilder<?> innerOp;
-
-        public LeafListMixinNormalization(final LeafListSchemaNode potential) {
-            super(new NodeIdentifier(potential.getQName()));
-            innerOp = new LeafListEntryNormalization(potential);
-        }
-
-        @Override
-        protected NormalizedNodeContainerBuilder createBuilder(
-            final Node node) {
-            return Builders.leafSetBuilder()
-                .withNodeIdentifier(getIdentifier());
-        }
-
-        @Override
-        public NormalizedNode<?, ?> createDefault(
-            final PathArgument currentArg) {
-            return Builders.leafSetBuilder().withNodeIdentifier(getIdentifier())
-                .build();
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(
-            final PathArgument child) {
-            if (child instanceof NodeWithValue) {
-                return innerOp;
-            }
-            return null;
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(final QName child) {
-            if (getIdentifier().getNodeType().equals(child)) {
-                return innerOp;
-            }
-            return null;
-        }
-
-    }
-
-
-    private static final class AugmentationNormalization extends
-        MixinNormalizationOp<AugmentationIdentifier> {
-
-        private final Map<QName, NodeToNormalizedNodeBuilder<?>> byQName;
-        private final Map<PathArgument, NodeToNormalizedNodeBuilder<?>> byArg;
-
-        public AugmentationNormalization(final AugmentationSchema augmentation,
-            final DataNodeContainer schema) {
-            super(augmentationIdentifierFrom(augmentation));
-
-            ImmutableMap.Builder<QName, NodeToNormalizedNodeBuilder<?>>
-                byQNameBuilder =
-                ImmutableMap.builder();
-            ImmutableMap.Builder<PathArgument, NodeToNormalizedNodeBuilder<?>>
-                byArgBuilder =
-                ImmutableMap.builder();
-
-            for (DataSchemaNode augNode : augmentation.getChildNodes()) {
-                DataSchemaNode resolvedNode =
-                    schema.getDataChildByName(augNode.getQName());
-                NodeToNormalizedNodeBuilder<?> resolvedOp =
-                    fromDataSchemaNode(resolvedNode);
-                byArgBuilder.put(resolvedOp.getIdentifier(), resolvedOp);
-                for (QName resQName : resolvedOp.getQNameIdentifiers()) {
-                    byQNameBuilder.put(resQName, resolvedOp);
-                }
-            }
-            byQName = byQNameBuilder.build();
-            byArg = byArgBuilder.build();
-
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(
-            final PathArgument child) {
-            return byArg.get(child);
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(final QName child) {
-            return byQName.get(child);
-        }
-
-        @Override
-        protected Set<QName> getQNameIdentifiers() {
-            return getIdentifier().getPossibleChildNames();
-        }
-
-        @SuppressWarnings("rawtypes")
-        @Override
-        protected NormalizedNodeContainerBuilder createBuilder(
-            final Node node) {
-            return Builders.augmentationBuilder()
-                .withNodeIdentifier(getIdentifier());
-        }
-
-        @Override
-        public NormalizedNode<?, ?> createDefault(
-            final PathArgument currentArg) {
-            return Builders.augmentationBuilder()
-                .withNodeIdentifier(getIdentifier())
-                .build();
-        }
-
-    }
-
-
-    private static final class ListMixinNormalization extends
-        MixinNormalizationOp<NodeIdentifier> {
-
-        private final ListItemNormalization innerNode;
-
-        public ListMixinNormalization(final ListSchemaNode list) {
-            super(new NodeIdentifier(list.getQName()));
-            this.innerNode =
-                new ListItemNormalization(new NodeIdentifierWithPredicates(
-                    list.getQName(), Collections.<QName, Object>emptyMap()),
-                    list);
-        }
-
-        @SuppressWarnings("rawtypes")
-        @Override
-        protected NormalizedNodeContainerBuilder createBuilder(
-            final Node node) {
-            return Builders.mapBuilder().withNodeIdentifier(getIdentifier());
-        }
-
-        @Override
-        public NormalizedNode<?, ?> createDefault(
-            final PathArgument currentArg) {
-            return Builders.mapBuilder().withNodeIdentifier(getIdentifier())
-                .build();
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(
-            final PathArgument child) {
-            if (child.getNodeType().equals(getIdentifier().getNodeType())) {
-                return innerNode;
-            }
-            return null;
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(final QName child) {
-            if (getIdentifier().getNodeType().equals(child)) {
-                return innerNode;
-            }
-            return null;
-        }
-
-    }
-
-
-    private static class ChoiceNodeNormalization extends
-        MixinNormalizationOp<NodeIdentifier> {
-
-        private final ImmutableMap<QName, NodeToNormalizedNodeBuilder<?>>
-            byQName;
-        private final ImmutableMap<PathArgument, NodeToNormalizedNodeBuilder<?>>
-            byArg;
-
-        protected ChoiceNodeNormalization(
-            final org.opendaylight.yangtools.yang.model.api.ChoiceNode schema) {
-            super(new NodeIdentifier(schema.getQName()));
-            ImmutableMap.Builder<QName, NodeToNormalizedNodeBuilder<?>>
-                byQNameBuilder =
-                ImmutableMap.builder();
-            ImmutableMap.Builder<PathArgument, NodeToNormalizedNodeBuilder<?>>
-                byArgBuilder =
-                ImmutableMap.builder();
-
-            for (ChoiceCaseNode caze : schema.getCases()) {
-                for (DataSchemaNode cazeChild : caze.getChildNodes()) {
-                    NodeToNormalizedNodeBuilder<?> childOp =
-                        fromDataSchemaNode(cazeChild);
-                    byArgBuilder.put(childOp.getIdentifier(), childOp);
-                    for (QName qname : childOp.getQNameIdentifiers()) {
-                        byQNameBuilder.put(qname, childOp);
-                    }
-                }
-            }
-            byQName = byQNameBuilder.build();
-            byArg = byArgBuilder.build();
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(
-            final PathArgument child) {
-            return byArg.get(child);
-        }
-
-        @Override
-        public NodeToNormalizedNodeBuilder<?> getChild(final QName child) {
-            return byQName.get(child);
-        }
-
-        @Override
-        protected NormalizedNodeContainerBuilder createBuilder(
-            final Node node) {
-            return Builders.choiceBuilder().withNodeIdentifier(getIdentifier());
-        }
-
-        @Override
-        public NormalizedNode<?, ?> createDefault(
-            final PathArgument currentArg) {
-            return Builders.choiceBuilder().withNodeIdentifier(getIdentifier())
-                .build();
-        }
-    }
-
-    /**
-     * Find an appropriate NormalizedNodeBuilder using both the schema and the
-     * Path Argument
-     *
-     * @param schema
-     * @param child
-     * @return
-     */
-    public static NodeToNormalizedNodeBuilder<?> fromSchemaAndPathArgument(
-        final DataNodeContainer schema, final QName child) {
-        DataSchemaNode potential = schema.getDataChildByName(child);
-        if (potential == null) {
-            Iterable<org.opendaylight.yangtools.yang.model.api.ChoiceNode>
-                choices =
-                FluentIterable.from(schema.getChildNodes()).filter(
-                    org.opendaylight.yangtools.yang.model.api.ChoiceNode.class);
-            potential = findChoice(choices, child);
-        }
-        if (potential == null) {
-            if (logger.isTraceEnabled()) {
-                logger.trace("BAD CHILD = {}", child.toString());
-            }
-        }
-
-        checkArgument(potential != null,
-            "Supplied QName %s is not valid according to schema %s", child,
-            schema);
-
-        // If the schema in an instance of DataSchemaNode and the potential
-        // is augmenting something then there is a chance that this may be
-        // and augmentation node
-        if ((schema instanceof DataSchemaNode)
-            && potential.isAugmenting()) {
-
-            AugmentationNormalization augmentation =
-                fromAugmentation(schema, (AugmentationTarget) schema,
-                    potential);
-
-            // If an augmentation normalization (builder) is not found then
-            // we fall through to the regular processing
-            if(augmentation != null){
-                return augmentation;
-            }
-        }
-        return fromDataSchemaNode(potential);
-    }
-
-    /**
-     * Given a bunch of choice nodes and a the name of child find a choice node for that child which
-     * has a non-null value
-     *
-     * @param choices
-     * @param child
-     * @return
-     */
-    private static org.opendaylight.yangtools.yang.model.api.ChoiceNode findChoice(
-        final Iterable<org.opendaylight.yangtools.yang.model.api.ChoiceNode> choices,
-        final QName child) {
-        org.opendaylight.yangtools.yang.model.api.ChoiceNode foundChoice = null;
-        choiceLoop:
-        for (org.opendaylight.yangtools.yang.model.api.ChoiceNode choice : choices) {
-            for (ChoiceCaseNode caze : choice.getCases()) {
-                if (caze.getDataChildByName(child) != null) {
-                    foundChoice = choice;
-                    break choiceLoop;
-                }
-            }
-        }
-        return foundChoice;
-    }
-
-
-    /**
-     * Create an AugmentationIdentifier based on the AugmentationSchema
-     *
-     * @param augmentation
-     * @return
-     */
-    public static AugmentationIdentifier augmentationIdentifierFrom(
-        final AugmentationSchema augmentation) {
-        ImmutableSet.Builder<QName> potentialChildren = ImmutableSet.builder();
-        for (DataSchemaNode child : augmentation.getChildNodes()) {
-            potentialChildren.add(child.getQName());
-        }
-        return new AugmentationIdentifier(potentialChildren.build());
-    }
-
-    /**
-     * Create an AugmentationNormalization based on the schema of the DataContainer, the
-     * AugmentationTarget and the potential schema node
-     *
-     * @param schema
-     * @param augments
-     * @param potential
-     * @return
-     */
-    private static AugmentationNormalization fromAugmentation(
-        final DataNodeContainer schema, final AugmentationTarget augments,
-        final DataSchemaNode potential) {
-        AugmentationSchema augmentation = null;
-        for (AugmentationSchema aug : augments.getAvailableAugmentations()) {
-            DataSchemaNode child = aug.getDataChildByName(potential.getQName());
-            if (child != null) {
-                augmentation = aug;
-                break;
-            }
-
-        }
-        if (augmentation != null) {
-            return new AugmentationNormalization(augmentation, schema);
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * @param schema
-     * @param child
-     * @return
-     */
-    private static NodeToNormalizedNodeBuilder<?> fromSchema(
-        final DataNodeContainer schema, final PathArgument child) {
-        if (child instanceof AugmentationIdentifier) {
-            QName childQName = ((AugmentationIdentifier) child)
-                .getPossibleChildNames().iterator().next();
-
-            return fromSchemaAndPathArgument(schema, childQName);
-        }
-        return fromSchemaAndPathArgument(schema, child.getNodeType());
-    }
-
-    public static NodeToNormalizedNodeBuilder<?> fromDataSchemaNode(
-        final DataSchemaNode potential) {
-        if (potential instanceof ContainerSchemaNode) {
-            return new ContainerNormalization((ContainerSchemaNode) potential);
-        } else if (potential instanceof ListSchemaNode) {
-            return new ListMixinNormalization((ListSchemaNode) potential);
-        } else if (potential instanceof LeafSchemaNode) {
-            return new LeafNormalization((LeafSchemaNode) potential,
-                new NodeIdentifier(potential.getQName()));
-        } else if (potential instanceof org.opendaylight.yangtools.yang.model.api.ChoiceNode) {
-            return new ChoiceNodeNormalization(
-                (org.opendaylight.yangtools.yang.model.api.ChoiceNode) potential);
-        } else if (potential instanceof LeafListSchemaNode) {
-            return new LeafListMixinNormalization(
-                (LeafListSchemaNode) potential);
-        }
-        return null;
-    }
-
-    public static NodeToNormalizedNodeBuilder<?> from(final SchemaContext ctx) {
-        return new ContainerNormalization(ctx);
-    }
-
-    public abstract NormalizedNode<?, ?> createDefault(PathArgument currentArg);
-
-}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/NodeValueCodec.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/NodeValueCodec.java
deleted file mode 100644 (file)
index b6dbefb..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- *  This program and the accompanying materials are made available under the
- *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
- *  and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- */
-
-package org.opendaylight.controller.cluster.datastore.node;
-
-import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
-import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.yangtools.yang.data.api.codec.BitsCodec;
-import org.opendaylight.yangtools.yang.data.impl.codec.TypeDefinitionAwareCodec;
-import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.TypeDefinition;
-import org.opendaylight.yangtools.yang.model.util.IdentityrefType;
-import org.opendaylight.yangtools.yang.model.util.InstanceIdentifierType;
-import org.opendaylight.yangtools.yang.model.util.Leafref;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NodeValueCodec {
-    protected static final Logger logger = LoggerFactory
-        .getLogger(NodeValueCodec.class);
-
-    public static Object toTypeSafeValue(DataSchemaNode schema, TypeDefinition type, NormalizedNodeMessages.Node node){
-
-        String value = node.getValue();
-
-        if(schema != null && value != null){
-            TypeDefinition<?> baseType = type;
-
-            while (baseType.getBaseType() != null) {
-                baseType = baseType.getBaseType();
-            }
-
-            TypeDefinitionAwareCodec<Object, ? extends TypeDefinition<?>> codec =
-                TypeDefinitionAwareCodec.from(type);
-
-            if(codec instanceof BitsCodec){
-                if(value.contains("[]")){
-                    value = "";
-                } else {
-                    value = value.replace("[", "");
-                    value = value.replace("]", "");
-                    value = value.replace(",", " ");
-                }
-            }
-
-            if (codec != null) {
-                return codec.deserialize(value);
-            } else if(baseType instanceof Leafref) {
-                return value;
-            } else if(baseType instanceof IdentityrefType) {
-                return QNameFactory.create(value);
-            } else if(baseType instanceof InstanceIdentifierType) {
-                return InstanceIdentifierUtils.fromSerializable(node.getInstanceIdentifierValue());
-            } else {
-                logger.error("Could not figure out how to transform value " + value +  " for schemaType " + type);
-            }
-        }
-
-        return value;
-    }
-}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/NormalizedNodeToProtocolBufferNode.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/NormalizedNodeToProtocolBufferNode.java
deleted file mode 100644 (file)
index 68d3c59..0000000
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- *
- *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- *  This program and the accompanying materials are made available under the
- *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
- *  and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- */
-
-package org.opendaylight.controller.cluster.datastore.node;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
-import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MixinNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
-import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
-
-import java.util.Map;
-
-/**
- * NormalizedNodeToProtocolBufferNode walks the NormalizedNode tree converting it to the
- * NormalizedMessage.Node
- * <p/>
- * {@link org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode } is a tree like structure that provides a generic structure for a yang data
- * model
- */
-public class NormalizedNodeToProtocolBufferNode {
-
-
-    private final Node.Builder builderRoot;
-    private NormalizedNodeMessages.Container container;
-
-    public NormalizedNodeToProtocolBufferNode() {
-
-        builderRoot = Node.newBuilder();
-    }
-
-    public void encode(String parentPath, NormalizedNode<?, ?> normalizedNode) {
-        if (parentPath == null) {
-            parentPath = "";
-        }
-
-        NormalizedNodeMessages.Container.Builder containerBuilder =
-            NormalizedNodeMessages.Container.newBuilder();
-
-        if (normalizedNode != null) {
-
-            navigateNormalizedNode(0, parentPath, normalizedNode, builderRoot);
-            // here we need to put back the Node Tree in Container
-
-            container =
-                containerBuilder.setParentPath(parentPath).setNormalizedNode(
-                    builderRoot.build()).build();
-        } else {
-            //this can happen when an attempt was made to read from datastore and normalized node was null.
-            container = containerBuilder.setParentPath(parentPath).build();
-
-        }
-
-    }
-
-
-    private void navigateDataContainerNode(int level, final String parentPath,
-        final DataContainerNode<?> dataContainerNode,
-        Node.Builder builderParent) {
-
-        String newParentPath =
-            parentPath + "/" + dataContainerNode.getIdentifier().toString();
-        String type = getDataContainerType(dataContainerNode).getSimpleName();
-        builderParent.setPath(dataContainerNode.getIdentifier().toString())
-            .setType(type);
-
-        final Iterable<DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>>
-            value =
-            dataContainerNode.getValue();
-        for (NormalizedNode<?, ?> node : value) {
-            Node.Builder builderChild = Node.newBuilder();
-            if (node instanceof MixinNode
-                && node instanceof NormalizedNodeContainer) {
-
-                navigateNormalizedNodeContainerMixin(level, newParentPath,
-                    (NormalizedNodeContainer<?, ?, ?>) node, builderChild);
-            } else {
-                navigateNormalizedNode(level, newParentPath, node,
-                    builderChild);
-            }
-            builderParent.addChild(builderChild);
-        }
-    }
-
-    private Class getDataContainerType(
-        NormalizedNodeContainer<?, ?, ?> dataContainerNode) {
-        if (dataContainerNode instanceof ChoiceNode) {
-            return ChoiceNode.class;
-        } else if (dataContainerNode instanceof AugmentationNode) {
-            return AugmentationNode.class;
-        } else if (dataContainerNode instanceof ContainerNode) {
-            return ContainerNode.class;
-        } else if (dataContainerNode instanceof MapEntryNode) {
-            return MapEntryNode.class;
-        } else if (dataContainerNode instanceof UnkeyedListEntryNode) {
-            return UnkeyedListEntryNode.class;
-        } else if (dataContainerNode instanceof MapNode) {
-            return MapNode.class;
-        } else if (dataContainerNode instanceof LeafSetNode) {
-            return LeafSetNode.class;
-        }
-        throw new IllegalArgumentException(
-            "could not find the data container node type "
-                + dataContainerNode.toString()
-        );
-    }
-
-    private void navigateNormalizedNodeContainerMixin(int level,
-        final String parentPath,
-        NormalizedNodeContainer<?, ?, ?> node, Node.Builder builderParent) {
-        String newParentPath =
-            parentPath + "/" + node.getIdentifier().toString();
-
-        builderParent.setPath(node.getIdentifier().toString()).setType(
-            this.getDataContainerType(node).getSimpleName());
-        final Iterable<? extends NormalizedNode<?, ?>> value = node.getValue();
-        for (NormalizedNode normalizedNode : value) {
-            // child node builder
-            Node.Builder builderChild = Node.newBuilder();
-            if (normalizedNode instanceof MixinNode
-                && normalizedNode instanceof NormalizedNodeContainer) {
-                navigateNormalizedNodeContainerMixin(level + 1, newParentPath,
-                    (NormalizedNodeContainer) normalizedNode, builderChild);
-            } else {
-                navigateNormalizedNode(level, newParentPath, normalizedNode,
-                    builderChild);
-            }
-            builderParent.addChild(builderChild);
-
-        }
-
-
-
-    }
-
-
-    private void navigateNormalizedNode(int level,
-        String parentPath, NormalizedNode<?, ?> normalizedNode,
-        Node.Builder builderParent) {
-
-        if (normalizedNode instanceof DataContainerNode) {
-
-            final DataContainerNode<?> dataContainerNode =
-                (DataContainerNode) normalizedNode;
-
-            navigateDataContainerNode(level + 1, parentPath, dataContainerNode,
-                builderParent);
-        } else if (normalizedNode instanceof MixinNode
-            && normalizedNode instanceof NormalizedNodeContainer) {
-
-            navigateNormalizedNodeContainerMixin(level, parentPath,
-                (NormalizedNodeContainer<?, ?, ?>) normalizedNode,
-                builderParent);
-        } else {
-            if (normalizedNode instanceof LeafNode) {
-                buildLeafNode(parentPath, normalizedNode, builderParent);
-            } else if (normalizedNode instanceof LeafSetEntryNode) {
-                buildLeafSetEntryNode(parentPath, normalizedNode,
-                    builderParent);
-            }
-
-        }
-
-    }
-
-    private void buildLeafSetEntryNode(String parentPath,
-        NormalizedNode<?, ?> normalizedNode,
-        Node.Builder builderParent) {
-        String path =
-            parentPath + "/" + normalizedNode.getIdentifier().toString();
-        LeafSetEntryNode leafSetEntryNode = (LeafSetEntryNode) normalizedNode;
-        Map<QName, String> attributes = leafSetEntryNode.getAttributes();
-        if (!attributes.isEmpty()) {
-            NormalizedNodeMessages.Attribute.Builder builder = null;
-            for (Map.Entry<QName, String> attribute : attributes.entrySet()) {
-                builder = NormalizedNodeMessages.Attribute.newBuilder();
-
-                builder
-                    .setName(attribute.getKey().toString())
-                    .setValue(normalizedNode.getValue().toString());
-
-                builderParent.addAttributes(builder.build());
-            }
-        }
-        buildNodeValue(normalizedNode, builderParent);
-    }
-
-    private void buildLeafNode(String parentPath,
-        NormalizedNode<?, ?> normalizedNode,
-        Node.Builder builderParent) {
-        Preconditions.checkNotNull(parentPath);
-        Preconditions.checkNotNull(normalizedNode);
-        String path =
-            parentPath + "/" + normalizedNode.getIdentifier().toString();
-        LeafNode leafNode = (LeafNode) normalizedNode;
-        Map<QName, String> attributes = leafNode.getAttributes();
-        if (!attributes.isEmpty()) {
-            NormalizedNodeMessages.Attribute.Builder builder = null;
-            for (Map.Entry<QName, String> attribute : attributes.entrySet()) {
-                builder = NormalizedNodeMessages.Attribute.newBuilder();
-                builder
-                    .setName(attribute.getKey().toString())
-                    .setValue(attribute.getValue().toString());
-
-                builderParent.addAttributes(builder.build());
-            }
-        }
-
-        Object value = normalizedNode.getValue();
-        if (value == null) {
-            builderParent
-                .setPath(normalizedNode.getIdentifier().toString())
-                .setType(LeafNode.class.getSimpleName())
-                .setValueType(String.class.getSimpleName())
-                .setValue("");
-        } else {
-            buildNodeValue(normalizedNode, builderParent);
-        }
-    }
-
-    private void buildNodeValue(NormalizedNode<?, ?> normalizedNode,
-        Node.Builder builderParent) {
-
-        Object value = normalizedNode.getValue();
-
-        builderParent
-            .setPath(normalizedNode.getIdentifier().toString())
-            .setType(LeafNode.class.getSimpleName())
-            .setValueType((value.getClass().getSimpleName()))
-            .setValue(value.toString());
-
-        if(value.getClass().equals(YangInstanceIdentifier.class)){
-            builderParent.setInstanceIdentifierValue(
-                InstanceIdentifierUtils
-                    .toSerializable((YangInstanceIdentifier) value));
-        }
-    }
-
-    public NormalizedNodeMessages.Container getContainer() {
-        return container;
-    }
-}
index ea3986f..da61e6d 100644 (file)
@@ -32,20 +32,6 @@ public class NodeIdentifierFactory {
         return value;
     }
 
-    public static YangInstanceIdentifier.PathArgument getArgument(String id, DataSchemaNode schemaNode){
-        YangInstanceIdentifier.PathArgument value = cache.get(id);
-        if(value == null){
-            synchronized (cache){
-                value = cache.get(id);
-                if(value == null) {
-                    value = createPathArgument(id, schemaNode);
-                    cache.put(id, value);
-                }
-            }
-        }
-        return value;
-    }
-
     public static YangInstanceIdentifier.PathArgument createPathArgument(String id, DataSchemaNode schemaNode){
         final NodeIdentifierWithPredicatesGenerator
             nodeIdentifierWithPredicatesGenerator = new NodeIdentifierWithPredicatesGenerator(id, schemaNode);
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NodeTypes.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NodeTypes.java
new file mode 100644 (file)
index 0000000..3ff6efb
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ *
+ *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ *  This program and the accompanying materials are made available under the
+ *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ *  and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+public class NodeTypes {
+
+    public static final byte LEAF_NODE = 1;
+    public static final byte LEAF_SET = 2;
+    public static final byte LEAF_SET_ENTRY_NODE = 3;
+    public static final byte CONTAINER_NODE = 4;
+    public static final byte UNKEYED_LIST = 5;
+    public static final byte UNKEYED_LIST_ITEM = 6;
+    public static final byte MAP_NODE = 7;
+    public static final byte MAP_ENTRY_NODE = 8;
+    public static final byte ORDERED_MAP_NODE = 9;
+    public static final byte CHOICE_NODE = 10;
+    public static final byte AUGMENTATION_NODE = 11;
+    public static final byte ANY_XML_NODE = 12;
+    public static final byte END_NODE = 13;
+
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java
new file mode 100644 (file)
index 0000000..bdc52bc
--- /dev/null
@@ -0,0 +1,391 @@
+/*
+ *
+ *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ *  This program and the accompanying materials are made available under the
+ *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ *  and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.OrderedMapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeAttrBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * NormalizedNodeInputStreamReader reads the byte stream and constructs the normalized node including its children nodes.
+ * This process goes in recursive manner, where each NodeTypes object signifies the start of the object, except END_NODE.
+ * If a node can have children, then that node's end is calculated based on appearance of END_NODE.
+ *
+ */
+
+public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamReader {
+
+    private DataInputStream reader;
+
+    private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeInputStreamReader.class);
+
+    private Map<Integer, String> codedStringMap = new HashMap<>();
+    private static final String REVISION_ARG = "?revision=";
+
+    public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
+        Preconditions.checkNotNull(stream);
+        reader = new DataInputStream(stream);
+    }
+
+
+    public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
+        NormalizedNode<?, ?> node = null;
+
+        // each node should start with a byte
+        byte nodeType = reader.readByte();
+
+        if(nodeType == NodeTypes.END_NODE) {
+            LOG.debug("End node reached. return");
+            return null;
+        }
+        else if(nodeType == NodeTypes.AUGMENTATION_NODE) {
+            LOG.debug("Reading augmentation node. will create augmentation identifier");
+
+            YangInstanceIdentifier.AugmentationIdentifier identifier =
+                new YangInstanceIdentifier.AugmentationIdentifier(readQNameSet());
+            DataContainerNodeBuilder<YangInstanceIdentifier.AugmentationIdentifier, AugmentationNode> augmentationBuilder =
+                Builders.augmentationBuilder().withNodeIdentifier(identifier);
+            augmentationBuilder = addDataContainerChildren(augmentationBuilder);
+            node = augmentationBuilder.build();
+
+        } else {
+            QName qName = readQName();
+
+            if(nodeType == NodeTypes.LEAF_SET_ENTRY_NODE) {
+                LOG.debug("Reading leaf set entry node. Will create NodeWithValue instance identifier");
+
+                // Read the object value
+                Object value = readObject();
+
+                YangInstanceIdentifier.NodeWithValue nodeWithValue = new YangInstanceIdentifier.NodeWithValue(qName, value);
+                node =  Builders.leafSetEntryBuilder().withNodeIdentifier(nodeWithValue).withValue(value).build();
+
+            } else if(nodeType == NodeTypes.MAP_ENTRY_NODE) {
+                LOG.debug("Reading map entry node. Will create node identifier with predicates.");
+
+                YangInstanceIdentifier.NodeIdentifierWithPredicates nodeIdentifier =
+                    new YangInstanceIdentifier.NodeIdentifierWithPredicates(qName, readKeyValueMap());
+                DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifierWithPredicates, MapEntryNode> mapEntryBuilder
+                    = Builders.mapEntryBuilder().withNodeIdentifier(nodeIdentifier);
+
+                mapEntryBuilder = (DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifierWithPredicates,
+                    MapEntryNode>)addDataContainerChildren(mapEntryBuilder);
+                node = mapEntryBuilder.build();
+
+            } else {
+                LOG.debug("Creating standard node identifier. ");
+                YangInstanceIdentifier.NodeIdentifier identifier = new YangInstanceIdentifier.NodeIdentifier(qName);
+                node = readNodeIdentifierDependentNode(nodeType, identifier);
+
+            }
+        }
+        return node;
+    }
+
+    private NormalizedNode<?, ?> readNodeIdentifierDependentNode(byte nodeType, YangInstanceIdentifier.NodeIdentifier identifier)
+        throws IOException {
+
+        switch(nodeType) {
+            case NodeTypes.LEAF_NODE :
+                LOG.debug("Read leaf node");
+                // Read the object value
+                NormalizedNodeAttrBuilder leafBuilder = Builders.leafBuilder();
+                return leafBuilder.withNodeIdentifier(identifier).withValue(readObject()).build();
+
+            case NodeTypes.ANY_XML_NODE :
+                LOG.debug("Read xml node");
+                Node value = (Node) readObject();
+                return Builders.anyXmlBuilder().withValue(value).build();
+
+            case NodeTypes.MAP_NODE :
+                LOG.debug("Read map node");
+                CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = Builders.mapBuilder().withNodeIdentifier(identifier);
+                mapBuilder = addMapNodeChildren(mapBuilder);
+                return mapBuilder.build();
+
+            case NodeTypes.CHOICE_NODE :
+                LOG.debug("Read choice node");
+                DataContainerNodeBuilder<YangInstanceIdentifier.NodeIdentifier, ChoiceNode> choiceBuilder =
+                    Builders.choiceBuilder().withNodeIdentifier(identifier);
+                choiceBuilder = addDataContainerChildren(choiceBuilder);
+                return choiceBuilder.build();
+
+            case NodeTypes.ORDERED_MAP_NODE :
+                LOG.debug("Reading ordered map node");
+                CollectionNodeBuilder<MapEntryNode, OrderedMapNode> orderedMapBuilder =
+                    Builders.orderedMapBuilder().withNodeIdentifier(identifier);
+                orderedMapBuilder = addMapNodeChildren(orderedMapBuilder);
+                return orderedMapBuilder.build();
+
+            case NodeTypes.UNKEYED_LIST :
+                LOG.debug("Read unkeyed list node");
+                CollectionNodeBuilder<UnkeyedListEntryNode, UnkeyedListNode> unkeyedListBuilder =
+                    Builders.unkeyedListBuilder().withNodeIdentifier(identifier);
+                unkeyedListBuilder = addUnkeyedListChildren(unkeyedListBuilder);
+                return unkeyedListBuilder.build();
+
+            case NodeTypes.UNKEYED_LIST_ITEM :
+                LOG.debug("Read unkeyed list item node");
+                DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, UnkeyedListEntryNode> unkeyedListEntryBuilder
+                    = Builders.unkeyedListEntryBuilder().withNodeIdentifier(identifier);
+
+                unkeyedListEntryBuilder = (DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, UnkeyedListEntryNode>)
+                    addDataContainerChildren(unkeyedListEntryBuilder);
+                return unkeyedListEntryBuilder.build();
+
+            case NodeTypes.CONTAINER_NODE :
+                LOG.debug("Read container node");
+                DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> containerBuilder =
+                    Builders.containerBuilder().withNodeIdentifier(identifier);
+
+                containerBuilder = (DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode>)
+                    addDataContainerChildren(containerBuilder);
+                return containerBuilder.build();
+
+            case NodeTypes.LEAF_SET :
+                LOG.debug("Read leaf set node");
+                ListNodeBuilder<Object, LeafSetEntryNode<Object>> leafSetBuilder =
+                    Builders.leafSetBuilder().withNodeIdentifier(identifier);
+                leafSetBuilder = addLeafSetChildren(leafSetBuilder);
+                return leafSetBuilder.build();
+
+            default :
+                return null;
+        }
+    }
+
+    private QName readQName() throws IOException {
+        // Read in the same sequence of writing
+        String localName = readCodedString();
+        String namespace = readCodedString();
+        String revision = readCodedString();
+        String qName;
+        // Not using stringbuilder as compiler optimizes string concatenation of +
+        if(revision != null){
+            qName = "(" + namespace+ REVISION_ARG + revision + ")" +localName;
+        } else {
+            qName = "(" + namespace + ")" +localName;
+        }
+
+        return QNameFactory.create(qName);
+    }
+
+
+    private String readCodedString() throws IOException {
+        boolean readFromMap = reader.readBoolean();
+        if(readFromMap) {
+            return codedStringMap.get(reader.readInt());
+        } else {
+            String value = reader.readUTF();
+            if(value != null) {
+                codedStringMap.put(Integer.valueOf(codedStringMap.size()), value);
+            }
+            return value;
+        }
+    }
+
+    private Set<QName> readQNameSet() throws IOException{
+        // Read the children count
+        int count = reader.readInt();
+        Set<QName> children = new HashSet<>(count);
+        for(int i = 0; i<count; i++) {
+            children.add(readQName());
+        }
+        return children;
+    }
+
+    private Map<QName, Object> readKeyValueMap() throws IOException {
+        int count = reader.readInt();
+        Map<QName, Object> keyValueMap = new HashMap<>(count);
+
+        for(int i = 0; i<count; i++) {
+            keyValueMap.put(readQName(), readObject());
+        }
+
+        return keyValueMap;
+    }
+
+    private Object readObject() throws IOException {
+        byte objectType = reader.readByte();
+        switch(objectType) {
+            case ValueTypes.BITS_TYPE:
+                return readObjSet();
+
+            case ValueTypes.BOOL_TYPE :
+                return reader.readBoolean();
+
+            case ValueTypes.BYTE_TYPE :
+                return reader.readByte();
+
+            case ValueTypes.INT_TYPE :
+                return reader.readInt();
+
+            case ValueTypes.LONG_TYPE :
+                return reader.readLong();
+
+            case ValueTypes.QNAME_TYPE :
+                return readQName();
+
+            case ValueTypes.SHORT_TYPE :
+                return reader.readShort();
+
+            case ValueTypes.STRING_TYPE :
+                return reader.readUTF();
+
+            case ValueTypes.BIG_DECIMAL_TYPE :
+                return new BigDecimal(reader.readUTF());
+
+            case ValueTypes.BIG_INTEGER_TYPE :
+                return new BigInteger(reader.readUTF());
+
+            case ValueTypes.YANG_IDENTIFIER_TYPE :
+                int size = reader.readInt();
+
+                List<YangInstanceIdentifier.PathArgument> pathArguments = new ArrayList<>(size);
+
+                for(int i=0; i<size; i++) {
+                    pathArguments.add(readPathArgument());
+                }
+                return YangInstanceIdentifier.create(pathArguments);
+
+            default :
+                return null;
+        }
+    }
+
+    private Set<String> readObjSet() throws IOException {
+        int count = reader.readInt();
+        Set<String> children = new HashSet<>(count);
+        for(int i = 0; i<count; i++) {
+            children.add(readCodedString());
+        }
+        return children;
+    }
+
+    private YangInstanceIdentifier.PathArgument readPathArgument() throws IOException {
+        // read Type
+        int type = reader.readByte();
+
+        switch(type) {
+
+            case PathArgumentTypes.AUGMENTATION_IDENTIFIER :
+                return new YangInstanceIdentifier.AugmentationIdentifier(readQNameSet());
+
+            case PathArgumentTypes.NODE_IDENTIFIER :
+            return new YangInstanceIdentifier.NodeIdentifier(readQName());
+
+            case PathArgumentTypes.NODE_IDENTIFIER_WITH_PREDICATES :
+            return new YangInstanceIdentifier.NodeIdentifierWithPredicates(readQName(), readKeyValueMap());
+
+            case PathArgumentTypes.NODE_IDENTIFIER_WITH_VALUE :
+            return new YangInstanceIdentifier.NodeWithValue(readQName(), readObject());
+
+            default :
+                return null;
+        }
+    }
+
+    private ListNodeBuilder<Object, LeafSetEntryNode<Object>> addLeafSetChildren(ListNodeBuilder<Object,
+        LeafSetEntryNode<Object>> builder)
+        throws IOException {
+
+        LOG.debug("Reading children of leaf set");
+        LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNode();
+
+        while(child != null) {
+            builder.withChild(child);
+            child = (LeafSetEntryNode<Object>)readNormalizedNode();
+        }
+        return builder;
+    }
+
+    private CollectionNodeBuilder<UnkeyedListEntryNode, UnkeyedListNode> addUnkeyedListChildren(
+        CollectionNodeBuilder<UnkeyedListEntryNode, UnkeyedListNode> builder)
+        throws IOException{
+
+        LOG.debug("Reading children of unkeyed list");
+        UnkeyedListEntryNode child = (UnkeyedListEntryNode)readNormalizedNode();
+
+        while(child != null) {
+            builder.withChild(child);
+            child = (UnkeyedListEntryNode)readNormalizedNode();
+        }
+        return builder;
+    }
+
+    private DataContainerNodeBuilder addDataContainerChildren(DataContainerNodeBuilder builder)
+        throws IOException {
+        LOG.debug("Reading data container (leaf nodes) nodes");
+
+        DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?> child =
+            (DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>) readNormalizedNode();
+
+        while(child != null) {
+            builder.withChild(child);
+            child =
+                (DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>) readNormalizedNode();
+        }
+        return builder;
+    }
+
+
+    private CollectionNodeBuilder addMapNodeChildren(CollectionNodeBuilder builder)
+        throws IOException {
+        LOG.debug("Reading map node children");
+        MapEntryNode child = (MapEntryNode)readNormalizedNode();
+
+        while(child != null){
+            builder.withChild(child);
+            child = (MapEntryNode)readNormalizedNode();
+        }
+
+        return builder;
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
new file mode 100644 (file)
index 0000000..05a47a0
--- /dev/null
@@ -0,0 +1,343 @@
+/*
+ *
+ *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ *  This program and the accompanying materials are made available under the
+ *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ *  and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
+ * a stream.
+ * A stream writer wrapper around this class will write node objects to stream in recursive manner.
+ * for example - If you have a ContainerNode which has a two LeafNode as children, then
+ * you will first call {@link #startContainerNode(YangInstanceIdentifier.NodeIdentifier, int)}, then will call
+ * {@link #leafNode(YangInstanceIdentifier.NodeIdentifier, Object)} twice and then, {@link #endNode()} to end
+ * container node.
+ *
+ * Based on the each node, the node type is also written to the stream, that helps in reconstructing the object,
+ * while reading.
+ *
+ *
+ */
+
+public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWriter{
+
+    private DataOutputStream writer;
+
+    private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
+
+    private Map<String, Integer> stringCodeMap = new HashMap<>();
+
+    public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
+        Preconditions.checkNotNull(stream);
+        writer = new DataOutputStream(stream);
+    }
+
+    @Override
+    public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(name, "Node identifier should not be null");
+        LOG.debug("Writing a new leaf node");
+        startNode(name.getNodeType(), NodeTypes.LEAF_NODE);
+
+        writeObject(value);
+    }
+
+    @Override
+    public void startLeafSet(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(name, "Node identifier should not be null");
+        LOG.debug("Starting a new leaf set");
+
+        startNode(name.getNodeType(), NodeTypes.LEAF_SET);
+    }
+
+    @Override
+    public void leafSetEntryNode(YangInstanceIdentifier.NodeWithValue name, Object value) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(name, "Node identifier should not be null");
+
+        LOG.debug("Writing a new leaf set entry node");
+        startNode(name.getNodeType(), NodeTypes.LEAF_SET_ENTRY_NODE);
+
+        writeObject(value);
+    }
+
+    @Override
+    public void startContainerNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(name, "Node identifier should not be null");
+
+        LOG.debug("Starting a new container node");
+
+        startNode(name.getNodeType(), NodeTypes.CONTAINER_NODE);
+    }
+
+    @Override
+    public void startUnkeyedList(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(name, "Node identifier should not be null");
+        LOG.debug("Starting a new unkeyed list");
+
+        startNode(name.getNodeType(), NodeTypes.UNKEYED_LIST);
+    }
+
+    @Override
+    public void startUnkeyedListItem(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalStateException {
+        Preconditions.checkNotNull(name, "Node identifier should not be null");
+        LOG.debug("Starting a new unkeyed list item");
+
+        startNode(name.getNodeType(), NodeTypes.UNKEYED_LIST_ITEM);
+    }
+
+    @Override
+    public void startMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(name, "Node identifier should not be null");
+        LOG.debug("Starting a new map node");
+
+        startNode(name.getNodeType(), NodeTypes.MAP_NODE);
+    }
+
+    @Override
+    public void startMapEntryNode(YangInstanceIdentifier.NodeIdentifierWithPredicates identifier, int childSizeHint) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(identifier, "Node identifier should not be null");
+        LOG.debug("Starting a new map entry node");
+        startNode(identifier.getNodeType(), NodeTypes.MAP_ENTRY_NODE);
+
+        writeKeyValueMap(identifier.getKeyValues());
+
+    }
+
+    @Override
+    public void startOrderedMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(name, "Node identifier should not be null");
+        LOG.debug("Starting a new ordered map node");
+
+        startNode(name.getNodeType(), NodeTypes.ORDERED_MAP_NODE);
+    }
+
+    @Override
+    public void startChoiceNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(name, "Node identifier should not be null");
+        LOG.debug("Starting a new choice node");
+
+        startNode(name.getNodeType(), NodeTypes.CHOICE_NODE);
+    }
+
+    @Override
+    public void startAugmentationNode(YangInstanceIdentifier.AugmentationIdentifier identifier) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(identifier, "Node identifier should not be null");
+        LOG.debug("Starting a new augmentation node");
+
+        writer.writeByte(NodeTypes.AUGMENTATION_NODE);
+        writeQNameSet(identifier.getPossibleChildNames());
+    }
+
+    @Override
+    public void anyxmlNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
+        Preconditions.checkNotNull(name, "Node identifier should not be null");
+        LOG.debug("Writing a new xml node");
+
+        startNode(name.getNodeType(), NodeTypes.ANY_XML_NODE);
+
+        writeObject(value);
+    }
+
+    @Override
+    public void endNode() throws IOException, IllegalStateException {
+        LOG.debug("Ending the node");
+
+        writer.writeByte(NodeTypes.END_NODE);
+    }
+
+    @Override
+    public void close() throws IOException {
+        writer.close();
+    }
+
+    @Override
+    public void flush() throws IOException {
+        writer.flush();
+    }
+
+    private void startNode(final QName qName, byte nodeType) throws IOException {
+
+        Preconditions.checkNotNull(qName, "QName of node identifier should not be null.");
+        // First write the type of node
+        writer.writeByte(nodeType);
+        // Write Start Tag
+        writeQName(qName);
+    }
+
+    private void writeQName(QName qName) throws IOException {
+
+        writeCodedString(qName.getLocalName());
+        writeCodedString(qName.getNamespace().toString());
+        writeCodedString(qName.getFormattedRevision());
+    }
+
+    private void writeCodedString(String key) throws IOException {
+        Integer value = stringCodeMap.get(key);
+
+        if(value != null) {
+            writer.writeBoolean(true);
+            writer.writeInt(value);
+        } else {
+            if(key != null) {
+                stringCodeMap.put(key, Integer.valueOf(stringCodeMap.size()));
+            }
+            writer.writeBoolean(false);
+            writer.writeUTF(key);
+        }
+    }
+
+    private void writeObjSet(Set set) throws IOException {
+        if(!set.isEmpty()){
+            writer.writeInt(set.size());
+            for(Object o : set){
+                if(o instanceof String){
+                    writeCodedString(o.toString());
+                } else {
+                    throw new IllegalArgumentException("Expected value type to be String but was : " +
+                        o.toString());
+                }
+            }
+        } else {
+            writer.writeInt(0);
+        }
+    }
+
+    private void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException {
+        Iterable<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
+        int size = Iterables.size(pathArguments);
+        writer.writeInt(size);
+
+        for(YangInstanceIdentifier.PathArgument pathArgument : pathArguments) {
+            writePathArgument(pathArgument);
+        }
+    }
+
+    private void writePathArgument(YangInstanceIdentifier.PathArgument pathArgument) throws IOException {
+
+        byte type = PathArgumentTypes.getSerializablePathArgumentType(pathArgument);
+
+        writer.writeByte(type);
+
+        switch(type) {
+            case PathArgumentTypes.NODE_IDENTIFIER :
+
+                YangInstanceIdentifier.NodeIdentifier nodeIdentifier =
+                    (YangInstanceIdentifier.NodeIdentifier) pathArgument;
+
+                writeQName(nodeIdentifier.getNodeType());
+                break;
+
+            case PathArgumentTypes.NODE_IDENTIFIER_WITH_PREDICATES:
+
+                YangInstanceIdentifier.NodeIdentifierWithPredicates nodeIdentifierWithPredicates =
+                    (YangInstanceIdentifier.NodeIdentifierWithPredicates) pathArgument;
+                writeQName(nodeIdentifierWithPredicates.getNodeType());
+
+                writeKeyValueMap(nodeIdentifierWithPredicates.getKeyValues());
+                break;
+
+            case PathArgumentTypes.NODE_IDENTIFIER_WITH_VALUE :
+
+                YangInstanceIdentifier.NodeWithValue nodeWithValue =
+                    (YangInstanceIdentifier.NodeWithValue) pathArgument;
+
+                writeQName(nodeWithValue.getNodeType());
+                writeObject(nodeWithValue.getValue());
+                break;
+
+            case PathArgumentTypes.AUGMENTATION_IDENTIFIER :
+
+                YangInstanceIdentifier.AugmentationIdentifier augmentationIdentifier =
+                    (YangInstanceIdentifier.AugmentationIdentifier) pathArgument;
+
+                // No Qname in augmentation identifier
+                writeQNameSet(augmentationIdentifier.getPossibleChildNames());
+                break;
+            default :
+                throw new IllegalStateException("Unknown node identifier type is found : " + pathArgument.getClass().toString() );
+        }
+    }
+
+    private void writeKeyValueMap(Map<QName, Object> keyValueMap) throws IOException {
+        if(keyValueMap != null && !keyValueMap.isEmpty()) {
+            writer.writeInt(keyValueMap.size());
+            Set<QName> qNameSet = keyValueMap.keySet();
+
+            for(QName qName : qNameSet) {
+                writeQName(qName);
+                writeObject(keyValueMap.get(qName));
+            }
+        } else {
+            writer.writeInt(0);
+        }
+    }
+
+    private void writeQNameSet(Set<QName> children) throws IOException {
+        // Write each child's qname separately, if list is empty send count as 0
+        if(children != null && !children.isEmpty()) {
+            writer.writeInt(children.size());
+            for(QName qName : children) {
+                writeQName(qName);
+            }
+        } else {
+            LOG.debug("augmentation node does not have any child");
+            writer.writeInt(0);
+        }
+    }
+
+    private void writeObject(Object value) throws IOException {
+
+        byte type = ValueTypes.getSerializableType(value);
+        // Write object type first
+        writer.writeByte(type);
+
+        switch(type) {
+            case ValueTypes.BOOL_TYPE:
+                writer.writeBoolean((Boolean) value);
+                break;
+            case ValueTypes.QNAME_TYPE:
+                writeQName((QName) value);
+                break;
+            case ValueTypes.INT_TYPE:
+                writer.writeInt((Integer) value);
+                break;
+            case ValueTypes.BYTE_TYPE:
+                writer.writeByte((Byte) value);
+                break;
+            case ValueTypes.LONG_TYPE:
+                writer.writeLong((Long) value);
+                break;
+            case ValueTypes.SHORT_TYPE:
+                writer.writeShort((Short) value);
+                break;
+            case ValueTypes.BITS_TYPE:
+                writeObjSet((Set) value);
+                break;
+            case ValueTypes.YANG_IDENTIFIER_TYPE:
+                writeYangInstanceIdentifier((YangInstanceIdentifier) value);
+                break;
+            default:
+                writer.writeUTF(value.toString());
+                break;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReader.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReader.java
new file mode 100644 (file)
index 0000000..c619afd
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ *
+ *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ *  This program and the accompanying materials are made available under the
+ *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ *  and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.io.IOException;
+
+
+public interface NormalizedNodeStreamReader extends AutoCloseable {
+
+    NormalizedNode<?, ?> readNormalizedNode() throws IOException;
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamWriter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamWriter.java
new file mode 100644 (file)
index 0000000..af95b61
--- /dev/null
@@ -0,0 +1,219 @@
+
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Event Stream Writer based on Normalized Node tree representation
+ *
+ * <h3>Writing Event Stream</h3>
+ *
+ * <ul>
+ * <li><code>container</code> - Container node representation, start event is
+ * emitted using {@link #startContainerNode(YangInstanceIdentifier.NodeIdentifier, int)}
+ * and node end event is
+ * emitted using {@link #endNode()}. Container node is implementing
+ * {@link org.opendaylight.yangtools.yang.binding.DataObject} interface.
+ *
+ * <li><code>list</code> - YANG list statement has two representation in event
+ * stream - unkeyed list and map. Unkeyed list is YANG list which did not
+ * specify key.</li>
+ *
+ * <ul>
+ * <li><code>Map</code> - Map start event is emitted using
+ * {@link #startMapNode(YangInstanceIdentifier.NodeIdentifier, int)}
+ * and is ended using {@link #endNode()}. Each map entry start is emitted using
+ * {@link #startMapEntryNode(YangInstanceIdentifier.NodeIdentifierWithPredicates, int)}
+ * with Map of keys
+ * and finished using {@link #endNode()}.</li>
+ *
+ * <li><code>UnkeyedList</code> - Unkeyed list represent list without keys,
+ * unkeyed list start is emitted using
+ * {@link #startUnkeyedList(YangInstanceIdentifier.NodeIdentifier, int)} list
+ * end is emitted using {@link #endNode()}. Each list item is emitted using
+ * {@link #startUnkeyedListItem(YangInstanceIdentifier.NodeIdentifier, int)}
+ * and ended using {@link #endNode()}.</li>
+ * </ul>
+ *
+ * <li><code>leaf</code> - Leaf node event is emitted using
+ * {@link #leafNode(YangInstanceIdentifier.NodeIdentifier, Object)}.
+ * {@link #endNode()} MUST NOT BE emitted for
+ * leaf node.</li>
+ *
+ * <li><code>leaf-list</code> - Leaf list start is emitted using
+ * {@link #startLeafSet(YangInstanceIdentifier.NodeIdentifier, int)}.
+ * Leaf list end is emitted using
+ * {@link #endNode()}. Leaf list entries are emitted using
+ * {@link #leafSetEntryNode(YangInstanceIdentifier.NodeWithValue name, Object).
+ *
+ * <li><code>anyxml - Anyxml node event is emitted using
+ * {@link #leafNode(YangInstanceIdentifier.NodeIdentifier, Object)}. {@link #endNode()} MUST NOT BE emitted
+ * for anyxml node.</code></li>
+ *
+ *
+ * <li><code>choice</code> Choice node event is emmited by
+ * {@link #startChoiceNode(YangInstanceIdentifier.NodeIdentifier, int)} event and
+ * finished by invoking {@link #endNode()}
+ * <li>
+ * <code>augment</code> - Represents augmentation, augmentation node is started
+ * by invoking {@link #startAugmentationNode(YangInstanceIdentifier.AugmentationIdentifier)} and
+ * finished by invoking {@link #endNode()}.</li>
+ *
+ * </ul>
+ *
+ * <h3>Implementation notes</h3>
+ *
+ * <p>
+ * Implementations of this interface must not hold user suppled objects
+ * and resources needlessly.
+ *
+ */
+
+public interface NormalizedNodeStreamWriter extends Closeable, Flushable {
+
+    public final int UNKNOWN_SIZE = -1;
+
+    /**
+     * Write the leaf node identifier and value to the stream.
+     * @param name
+     * @param value
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * Start writing leaf Set node. You must call {@link #endNode()} once you are done writing all of its children.
+     * @param name
+     * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void startLeafSet(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * Write the leaf Set Entry Node object to the stream with identifier and value.
+     * @param name
+     * @param value
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void leafSetEntryNode(YangInstanceIdentifier.NodeWithValue name, Object value)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * Start writing container node. You must call {@link #endNode()} once you are done writing all of its children.
+     * @param name
+     * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void startContainerNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * Start writing unkeyed list node. You must call {@link #endNode()} once you are done writing all of its children.
+     * @param name
+     * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void startUnkeyedList(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * Start writing unkeyed list item. You must call {@link #endNode()} once you are done writing all of its children.
+     * @param name
+     * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+     * @throws IOException
+     * @throws IllegalStateException
+     */
+    void startUnkeyedListItem(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+        throws IOException, IllegalStateException;
+
+    /**
+     * Start writing map node. You must call {@link #endNode()} once you are done writing all of its children.
+     * @param name
+     * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void startMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * Start writing map entry node. You must call {@link #endNode()} once you are done writing all of its children.
+     * @param identifier
+     * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void startMapEntryNode(YangInstanceIdentifier.NodeIdentifierWithPredicates identifier, int childSizeHint)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * Start writing ordered map node. You must call {@link #endNode()} once you are done writing all of its children.
+     * @param name
+     * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void startOrderedMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * Start writing choice node. You must call {@link #endNode()} once you are done writing all of its children.
+     * @param name
+     * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void startChoiceNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * Start writing augmentation node. You must call {@link #endNode()} once you are done writing all of its children.
+     * @param identifier
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void startAugmentationNode(YangInstanceIdentifier.AugmentationIdentifier identifier)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * Write any xml node identifier and value to the stream
+     * @param name
+     * @param value
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    void anyxmlNode(YangInstanceIdentifier.NodeIdentifier name, Object value)
+        throws IOException, IllegalArgumentException;
+
+    /**
+     * This method should be used to add end symbol/identifier of node in the stream.
+     * @throws IOException
+     * @throws IllegalStateException
+     */
+    void endNode() throws IOException, IllegalStateException;
+
+    @Override
+    void close() throws IOException;
+
+    @Override
+    void flush() throws IOException;
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/PathArgumentTypes.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/PathArgumentTypes.java
new file mode 100644 (file)
index 0000000..b01beb8
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.collect.ImmutableMap;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+import java.util.Map;
+
+public class PathArgumentTypes {
+    public static final byte AUGMENTATION_IDENTIFIER = 1;
+    public static final byte NODE_IDENTIFIER = 2;
+    public static final byte NODE_IDENTIFIER_WITH_VALUE = 3;
+    public static final byte NODE_IDENTIFIER_WITH_PREDICATES = 4;
+
+    private static Map<Class<?>, Byte> CLASS_TO_ENUM_MAP =
+            ImmutableMap.<Class<?>, Byte>builder().
+                put(YangInstanceIdentifier.AugmentationIdentifier.class, AUGMENTATION_IDENTIFIER).
+                put(YangInstanceIdentifier.NodeIdentifier.class, NODE_IDENTIFIER).
+                put(YangInstanceIdentifier.NodeIdentifierWithPredicates.class, NODE_IDENTIFIER_WITH_PREDICATES).
+                put(YangInstanceIdentifier.NodeWithValue.class, NODE_IDENTIFIER_WITH_VALUE).build();
+
+    public static byte getSerializablePathArgumentType(YangInstanceIdentifier.PathArgument pathArgument){
+
+        Byte type = CLASS_TO_ENUM_MAP.get(pathArgument.getClass());
+        if(type == null) {
+            throw new IllegalArgumentException("Unknown type of PathArgument = " + pathArgument);
+        }
+
+        return type;
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ValueTypes.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ValueTypes.java
new file mode 100644 (file)
index 0000000..6035e3c
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class ValueTypes {
+    public static final byte SHORT_TYPE = 1;
+    public static final byte BYTE_TYPE = 2;
+    public static final byte INT_TYPE = 3;
+    public static final byte LONG_TYPE = 4;
+    public static final byte BOOL_TYPE = 5;
+    public static final byte QNAME_TYPE = 6;
+    public static final byte BITS_TYPE = 7;
+    public static final byte YANG_IDENTIFIER_TYPE = 8;
+    public static final byte STRING_TYPE = 9;
+    public static final byte BIG_INTEGER_TYPE = 10;
+    public static final byte BIG_DECIMAL_TYPE = 11;
+
+    private static Map<Class, Byte> types = new HashMap<>();
+
+    static {
+        types.put(String.class, Byte.valueOf(STRING_TYPE));
+        types.put(Byte.class, Byte.valueOf(BYTE_TYPE));
+        types.put(Integer.class, Byte.valueOf(INT_TYPE));
+        types.put(Long.class, Byte.valueOf(LONG_TYPE));
+        types.put(Boolean.class, Byte.valueOf(BOOL_TYPE));
+        types.put(QName.class, Byte.valueOf(QNAME_TYPE));
+        types.put(Set.class, Byte.valueOf(BITS_TYPE));
+        types.put(YangInstanceIdentifier.class, Byte.valueOf(YANG_IDENTIFIER_TYPE));
+        types.put(Short.class, Byte.valueOf(SHORT_TYPE));
+        types.put(BigInteger.class, Byte.valueOf(BIG_INTEGER_TYPE));
+        types.put(BigDecimal.class, Byte.valueOf(BIG_DECIMAL_TYPE));
+    }
+
+    public static final byte getSerializableType(Object node){
+        Preconditions.checkNotNull(node, "node should not be null");
+
+        Byte type = types.get(node.getClass());
+        if(type != null) {
+            return type;
+        } else if(node instanceof Set){
+            return BITS_TYPE;
+        }
+
+        throw new IllegalArgumentException("Unknown value type " + node.getClass().getSimpleName());
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/util/EncoderDecoderUtil.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/util/EncoderDecoderUtil.java
deleted file mode 100644 (file)
index 90f5cf3..0000000
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- *
- *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- *  This program and the accompanying materials are made available under the
- *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
- *  and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- */
-
-package org.opendaylight.controller.cluster.datastore.util;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.opendaylight.controller.protobuff.messages.common.SimpleNormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
-import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.DomUtils;
-import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory;
-import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.serializer.DomFromNormalizedNodeSerializerFactory;
-import org.opendaylight.yangtools.yang.model.api.AugmentationSchema;
-import org.opendaylight.yangtools.yang.model.api.ChoiceNode;
-import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
-import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.TransformerFactoryConfigurationError;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import java.io.ByteArrayInputStream;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-/*
- *
- * <code>EncoderDecoderUtil</code> helps in wrapping the NormalizedNode into a SimpleNormalizedNode
- * protobuf message containing the XML representation of the NormalizeNode
- *
- * @author: syedbahm
- */
-public class EncoderDecoderUtil {
-    static DocumentBuilderFactory factory;
-
-    private static DomFromNormalizedNodeSerializerFactory serializerFactory =
-        DomFromNormalizedNodeSerializerFactory
-            .getInstance(XmlDocumentUtils.getDocument(),
-                DomUtils.defaultValueCodecProvider());
-
-    private static DomToNormalizedNodeParserFactory parserFactory =
-        DomToNormalizedNodeParserFactory
-            .getInstance(DomUtils.defaultValueCodecProvider());
-
-    static {
-        factory = DocumentBuilderFactory.newInstance();
-        factory.setNamespaceAware(true);
-        factory.setCoalescing(true);
-        factory.setIgnoringElementContentWhitespace(true);
-        factory.setIgnoringComments(true);
-    }
-
-    private static DataSchemaNode findChildNode(Collection<DataSchemaNode> children,
-        String name) {
-        List<DataNodeContainer> containers = Lists.newArrayList();
-
-        for (DataSchemaNode dataSchemaNode : children) {
-            if (dataSchemaNode.getQName().getLocalName().equals(name))
-                return dataSchemaNode;
-            if (dataSchemaNode instanceof DataNodeContainer) {
-                containers.add((DataNodeContainer) dataSchemaNode);
-            } else if (dataSchemaNode instanceof ChoiceNode) {
-                containers.addAll(((ChoiceNode) dataSchemaNode).getCases());
-            }
-        }
-
-        for (DataNodeContainer container : containers) {
-            DataSchemaNode retVal =
-                findChildNode(container.getChildNodes(), name);
-            if (retVal != null) {
-                return retVal;
-            }
-        }
-
-        return null;
-    }
-
-    private static DataSchemaNode getSchemaNode(SchemaContext context,
-        QName qname) {
-
-        for (Module module : context
-            .findModuleByNamespace(qname.getNamespace())) {
-            // we will take the first child as the start of the
-            if (module.getChildNodes() != null || !module.getChildNodes()
-                .isEmpty()) {
-
-                DataSchemaNode found =
-                    findChildNode(module.getChildNodes(), qname.getLocalName());
-                return found;
-            }
-        }
-        return null;
-    }
-
-    private static String toString(Element xml) {
-        try {
-            Transformer transformer =
-                TransformerFactory.newInstance().newTransformer();
-            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
-            StreamResult result = new StreamResult(new StringWriter());
-            DOMSource source = new DOMSource(xml);
-            transformer.transform(source, result);
-
-            return result.getWriter().toString();
-        } catch (IllegalArgumentException | TransformerFactoryConfigurationError
-            | TransformerException e) {
-            throw new RuntimeException("Unable to serialize xml element " + xml,
-                e);
-        }
-    }
-
-  private static String toString(Iterable<Element> xmlIterable) {
-    try {
-      Transformer transformer =
-          TransformerFactory.newInstance().newTransformer();
-      transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
-      StreamResult result = new StreamResult(new StringWriter());
-      Iterator iterator = xmlIterable.iterator();
-      DOMSource source;
-      if(iterator.hasNext()) {
-        source = new DOMSource((org.w3c.dom.Node) iterator.next());
-        transformer.transform(source, result);
-        transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
-      }
-
-      while(iterator.hasNext()) {
-        source = new DOMSource((org.w3c.dom.Node) iterator.next());
-        transformer.transform(source, result);
-      }
-      System.out.println(result.getWriter().toString());
-      return result.getWriter().toString();
-    } catch (IllegalArgumentException | TransformerFactoryConfigurationError
-        | TransformerException e) {
-      throw new RuntimeException("Unable to serialize xml element(s) " + xmlIterable.toString(),
-          e);
-    }
-  }
-
-    private static Iterable<Element> serialize(DataSchemaNode schemaNode, NormalizedNode normalizedNode){
-        if(schemaNode instanceof ContainerSchemaNode){      //1
-            return serializerFactory
-                .getContainerNodeSerializer()
-                .serialize((ContainerSchemaNode) schemaNode,
-                    (ContainerNode) normalizedNode);
-        } else if(schemaNode instanceof ChoiceNode){        //2
-            return serializerFactory
-                .getChoiceNodeSerializer()
-                .serialize((ChoiceNode) schemaNode,
-                    (org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode) normalizedNode);
-        } else if(schemaNode instanceof LeafSchemaNode){    //3
-            return serializerFactory
-                .getLeafNodeSerializer()
-                .serialize((LeafSchemaNode) schemaNode, (LeafNode) normalizedNode);
-        } else if(schemaNode instanceof ListSchemaNode){    //4
-            return serializerFactory
-                .getMapNodeSerializer()
-                .serialize((ListSchemaNode) schemaNode, (MapNode) normalizedNode);
-        } else if(schemaNode instanceof LeafListSchemaNode){    //5
-            return serializerFactory
-                .getLeafSetNodeSerializer()
-                .serialize((LeafListSchemaNode) schemaNode, (LeafSetNode) normalizedNode);
-        } else if(schemaNode instanceof AugmentationSchema){//6
-            return serializerFactory
-                .getAugmentationNodeSerializer()
-                .serialize((AugmentationSchema) schemaNode, (AugmentationNode) normalizedNode);
-        } else if(schemaNode instanceof ListSchemaNode && normalizedNode instanceof LeafSetEntryNode){    //7
-            return serializerFactory
-                .getLeafSetEntryNodeSerializer()
-                .serialize((LeafListSchemaNode) schemaNode, (LeafSetEntryNode) normalizedNode);
-        } else if(schemaNode instanceof ListSchemaNode){    //8
-            return serializerFactory
-                .getMapEntryNodeSerializer()
-                .serialize((ListSchemaNode) schemaNode, (MapEntryNode) normalizedNode);
-        }
-
-
-
-        throw new UnsupportedOperationException(schemaNode.getClass().toString());
-    }
-
-    private static NormalizedNode parse(Document doc, DataSchemaNode schemaNode){
-        if(schemaNode instanceof ContainerSchemaNode){
-            return parserFactory
-                .getContainerNodeParser()
-                .parse(Collections.singletonList(doc.getDocumentElement()),
-                    (ContainerSchemaNode) schemaNode);
-
-        } else if(schemaNode instanceof ChoiceNode){
-            return parserFactory
-                .getChoiceNodeParser()
-                .parse(Collections.singletonList(doc.getDocumentElement()),
-                    (ChoiceNode) schemaNode);
-        } else if(schemaNode instanceof LeafNode){
-            return parserFactory
-                .getLeafNodeParser()
-                .parse(Collections.singletonList(doc.getDocumentElement()),
-                    (LeafSchemaNode) schemaNode);
-        } else if(schemaNode instanceof ListSchemaNode){
-            return parserFactory
-                .getMapNodeParser()
-                .parse(Collections.singletonList(doc.getDocumentElement()),
-                    (ListSchemaNode) schemaNode);
-        } else if(schemaNode instanceof LeafListSchemaNode){
-            return parserFactory
-                .getLeafSetNodeParser()
-                .parse(Collections.singletonList(doc.getDocumentElement()),
-                    (LeafListSchemaNode) schemaNode);
-        } else if(schemaNode instanceof AugmentationSchema){
-            return parserFactory
-                .getAugmentationNodeParser()
-                .parse(Collections.singletonList(doc.getDocumentElement()),
-                    (AugmentationSchema) schemaNode);
-        } else if(schemaNode instanceof ListSchemaNode){
-            return parserFactory
-                .getMapEntryNodeParser()
-                .parse(Collections.singletonList(doc.getDocumentElement()),
-                    (ListSchemaNode) schemaNode);
-
-        }
-
-        throw new UnsupportedOperationException(schemaNode.getClass().toString());
-    }
-
-
-    /**
-     * Helps in generation of NormalizedNodeXml message for the supplied NormalizedNode
-     *
-     * @param sc             --SchemaContext
-     * @param normalizedNode -- Normalized Node to be encoded
-     * @return SimpleNormalizedNodeMessage.NormalizedNodeXml
-     */
-    public static SimpleNormalizedNodeMessage.NormalizedNodeXml encode(
-        SchemaContext sc, NormalizedNode<?, ?> normalizedNode) {
-
-        Preconditions.checkArgument(sc != null, "Schema context found null");
-
-        Preconditions.checkArgument(normalizedNode != null,
-            "normalized node found null");
-
-        DataSchemaNode schemaNode = getSchemaNode(sc,
-            normalizedNode.getIdentifier()
-                .getNodeType()
-        );
-
-        Preconditions.checkState(schemaNode != null,
-            "Couldn't find schema node for " + normalizedNode.getIdentifier());
-
-        Iterable<Element> els = serialize(schemaNode, normalizedNode);
-
-        String xmlString = toString(els.iterator().next());
-        SimpleNormalizedNodeMessage.NormalizedNodeXml.Builder builder =
-            SimpleNormalizedNodeMessage.NormalizedNodeXml.newBuilder();
-        builder.setXmlString(xmlString);
-        builder
-            .setNodeIdentifier(normalizedNode.getIdentifier()
-                .getNodeType().toString());
-        return builder.build();
-
-    }
-
-    /**
-     * Utilizes the SimpleNormalizedNodeMessage.NormalizedNodeXml to convert into NormalizedNode
-     *
-     * @param sc                -- schema context
-     * @param normalizedNodeXml -- containing the normalized Node XML
-     * @return NormalizedNode return
-     * @throws Exception
-     */
-
-    public static NormalizedNode decode(SchemaContext sc,
-        SimpleNormalizedNodeMessage.NormalizedNodeXml normalizedNodeXml)
-        throws Exception {
-
-        Preconditions
-            .checkArgument(sc != null, "schema context seems to be null");
-
-        Preconditions.checkArgument(normalizedNodeXml != null,
-            "SimpleNormalizedNodeMessage.NormalizedNodeXml found to be null");
-        QName qname = QName.create(normalizedNodeXml.getNodeIdentifier());
-
-        // here we will try to get back the NormalizedNode
-        DataSchemaNode schemaNode = getSchemaNode(sc, qname);
-
-        // now we need to read the XML
-        Document doc =
-            factory.newDocumentBuilder().parse(
-                new ByteArrayInputStream(
-                    normalizedNodeXml.getXmlString().getBytes(
-                        "utf-8"))
-            );
-
-        doc.getDocumentElement().normalize();
-
-
-        return parse(doc, schemaNode);
-    }
-
-
-
-}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java
new file mode 100644 (file)
index 0000000..052f609
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ *
+ *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ *  This program and the accompanying materials are made available under the
+ *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ *  and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.util.TestModel;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+public class NormalizedNodeStreamReaderWriterTest {
+
+    final NormalizedNode<?, ?> input = TestModel.createTestContainer();
+
+    @Test
+    public void testNormalizedNodeStreamReaderWriter() {
+
+        byte[] byteData = null;
+
+        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+            NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream)) {
+
+            NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer);
+            normalizedNodeWriter.write(input);
+            byteData = byteArrayOutputStream.toByteArray();
+
+        } catch (IOException e) {
+            fail("Writing to OutputStream failed :" + e.toString());
+        }
+
+        try(NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(new ByteArrayInputStream(byteData))) {
+
+            NormalizedNode<?,?> node = reader.readNormalizedNode();
+            Assert.assertEquals(input, node);
+
+        } catch (IOException e) {
+            fail("Reading from InputStream failed :" + e.toString());
+        }
+    }
+
+    @Test
+    public void testWithSerializable() {
+        SampleNormalizedNodeSerializable serializable = new SampleNormalizedNodeSerializable(input);
+        SampleNormalizedNodeSerializable clone = (SampleNormalizedNodeSerializable)SerializationUtils.clone(serializable);
+
+        Assert.assertEquals(input, clone.getInput());
+
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeWriter.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeWriter.java
new file mode 100644 (file)
index 0000000..845038e
--- /dev/null
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
+import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.OrderedMapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListNode;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeStreamWriter.UNKNOWN_SIZE;
+
+
+/**
+ * This class is used only for testing purpose for now, we may use similar logic while integrating
+ * with cluster
+ */
+
+public class NormalizedNodeWriter implements Closeable, Flushable {
+    private final NormalizedNodeStreamWriter writer;
+
+    private NormalizedNodeWriter(final NormalizedNodeStreamWriter writer) {
+        this.writer = Preconditions.checkNotNull(writer);
+    }
+
+    protected final NormalizedNodeStreamWriter getWriter() {
+        return writer;
+    }
+
+    /**
+     * Create a new writer backed by a {@link org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter}.
+     *
+     * @param writer Back-end writer
+     * @return A new instance.
+     */
+    public static NormalizedNodeWriter forStreamWriter(final NormalizedNodeStreamWriter writer) {
+        return new NormalizedNodeWriter(writer);
+    }
+
+
+    /**
+     * Iterate over the provided {@link org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode} and emit write
+     * events to the encapsulated {@link org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter}.
+     *
+     * @param node Node
+     * @return
+     * @throws java.io.IOException when thrown from the backing writer.
+     */
+    public final NormalizedNodeWriter write(final NormalizedNode<?, ?> node) throws IOException {
+        if (wasProcessedAsComplexNode(node)) {
+            return this;
+        }
+
+        if (wasProcessAsSimpleNode(node)) {
+            return this;
+        }
+
+        throw new IllegalStateException("It wasn't possible to serialize node " + node);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        writer.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        writer.flush();
+        writer.close();
+    }
+
+    /**
+     * Emit a best guess of a hint for a particular set of children. It evaluates the
+     * iterable to see if the size can be easily gotten to. If it is, we hint at the
+     * real number of child nodes. Otherwise we emit UNKNOWN_SIZE.
+     *
+     * @param children Child nodes
+     * @return Best estimate of the collection size required to hold all the children.
+     */
+    static final int childSizeHint(final Iterable<?> children) {
+        return (children instanceof Collection) ? ((Collection<?>) children).size() : UNKNOWN_SIZE;
+    }
+
+    private boolean wasProcessAsSimpleNode(final NormalizedNode<?, ?> node) throws IOException {
+        if (node instanceof LeafSetEntryNode) {
+            final LeafSetEntryNode<?> nodeAsLeafList = (LeafSetEntryNode<?>)node;
+            writer.leafSetEntryNode(nodeAsLeafList.getIdentifier(), nodeAsLeafList.getValue());
+            return true;
+        } else if (node instanceof LeafNode) {
+            final LeafNode<?> nodeAsLeaf = (LeafNode<?>)node;
+            writer.leafNode(nodeAsLeaf.getIdentifier(), nodeAsLeaf.getValue());
+            return true;
+        } else if (node instanceof AnyXmlNode) {
+            final AnyXmlNode anyXmlNode = (AnyXmlNode)node;
+            writer.anyxmlNode(anyXmlNode.getIdentifier(), anyXmlNode.getValue());
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Emit events for all children and then emit an endNode() event.
+     *
+     * @param children Child iterable
+     * @return True
+     * @throws java.io.IOException when the writer reports it
+     */
+    protected final boolean writeChildren(final Iterable<? extends NormalizedNode<?, ?>> children) throws IOException {
+        for (NormalizedNode<?, ?> child : children) {
+            write(child);
+        }
+
+        writer.endNode();
+        return true;
+    }
+
+    protected boolean writeMapEntryNode(final MapEntryNode node) throws IOException {
+        writer.startMapEntryNode(node.getIdentifier(), childSizeHint(node.getValue()));
+        return writeChildren(node.getValue());
+    }
+
+    private boolean wasProcessedAsComplexNode(final NormalizedNode<?, ?> node) throws IOException {
+        if (node instanceof ContainerNode) {
+            final ContainerNode n = (ContainerNode) node;
+            writer.startContainerNode(n.getIdentifier(), childSizeHint(n.getValue()));
+            return writeChildren(n.getValue());
+        }
+        if (node instanceof MapEntryNode) {
+            return writeMapEntryNode((MapEntryNode) node);
+        }
+        if (node instanceof UnkeyedListEntryNode) {
+            final UnkeyedListEntryNode n = (UnkeyedListEntryNode) node;
+            writer.startUnkeyedListItem(n.getIdentifier(), childSizeHint(n.getValue()));
+            return writeChildren(n.getValue());
+        }
+        if (node instanceof ChoiceNode) {
+            final ChoiceNode n = (ChoiceNode) node;
+            writer.startChoiceNode(n.getIdentifier(), childSizeHint(n.getValue()));
+            return writeChildren(n.getValue());
+        }
+        if (node instanceof AugmentationNode) {
+            final AugmentationNode n = (AugmentationNode) node;
+            writer.startAugmentationNode(n.getIdentifier());
+            return writeChildren(n.getValue());
+        }
+        if (node instanceof UnkeyedListNode) {
+            final UnkeyedListNode n = (UnkeyedListNode) node;
+            writer.startUnkeyedList(n.getIdentifier(), childSizeHint(n.getValue()));
+            return writeChildren(n.getValue());
+        }
+        if (node instanceof OrderedMapNode) {
+            final OrderedMapNode n = (OrderedMapNode) node;
+            writer.startOrderedMapNode(n.getIdentifier(), childSizeHint(n.getValue()));
+            return writeChildren(n.getValue());
+        }
+        if (node instanceof MapNode) {
+            final MapNode n = (MapNode) node;
+            writer.startMapNode(n.getIdentifier(), childSizeHint(n.getValue()));
+            return writeChildren(n.getValue());
+        }
+        if (node instanceof LeafSetNode) {
+            //covers also OrderedLeafSetNode for which doesn't exist start* method
+            final LeafSetNode<?> n = (LeafSetNode<?>) node;
+            writer.startLeafSet(n.getIdentifier(), childSizeHint(n.getValue()));
+            return writeChildren(n.getValue());
+        }
+
+        return false;
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/SampleNormalizedNodeSerializable.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/SampleNormalizedNodeSerializable.java
new file mode 100644 (file)
index 0000000..33d48a5
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+
+public class SampleNormalizedNodeSerializable implements Serializable {
+
+    private NormalizedNode<?, ?> input;
+
+    public SampleNormalizedNodeSerializable(NormalizedNode<?, ?> input) {
+        this.input = input;
+    }
+
+    public NormalizedNode<?, ?> getInput() {
+        return input;
+    }
+
+    private void readObject(final ObjectInputStream stream) throws IOException, ClassNotFoundException, URISyntaxException {
+        NormalizedNodeStreamReader reader = new NormalizedNodeInputStreamReader(stream);
+        this.input = reader.readNormalizedNode();
+    }
+
+    private void writeObject(final ObjectOutputStream stream) throws IOException {
+        NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(stream);
+        NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer);
+
+        normalizedNodeWriter.write(this.input);
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/util/NormalizedNodeXmlConverterTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/util/NormalizedNodeXmlConverterTest.java
deleted file mode 100644 (file)
index 853b3e2..0000000
+++ /dev/null
@@ -1,483 +0,0 @@
-/*
- *
- *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- *  This program and the accompanying materials are made available under the
- *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
- *  and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- */
-package org.opendaylight.controller.cluster.datastore.util;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.custommonkey.xmlunit.Diff;
-import org.custommonkey.xmlunit.XMLUnit;
-import org.junit.Test;
-import org.opendaylight.controller.protobuff.messages.common.SimpleNormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.DomUtils;
-import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory;
-import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.serializer.DomFromNormalizedNodeSerializerFactory;
-import org.opendaylight.yangtools.yang.model.api.ChoiceNode;
-import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
-import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.xml.sax.SAXException;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.TransformerFactoryConfigurationError;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.URI;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-
-/**
- * Two of the testcases in the yangtools/yang-data-impl are leveraged (with modification) to create
- * the serialization of NormalizedNode using the ProtocolBuffer
- *
- * @syedbahm
- *
- */
-
-
-public class NormalizedNodeXmlConverterTest {
-  private static final Logger logger = LoggerFactory
-      .getLogger(NormalizedNodeXmlConverterTest.class);
-  public static final String NAMESPACE =
-      "urn:opendaylight:params:xml:ns:yang:controller:test";
-  private static Date revision;
-  private ContainerNode expectedNode;
-  private ContainerSchemaNode containerNode;
-  private String xmlPath;
-
-  static {
-    try {
-      revision = new SimpleDateFormat("yyyy-MM-dd").parse("2014-03-13");
-    } catch (ParseException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static DataSchemaNode getSchemaNode(final SchemaContext context,
-      final String moduleName, final String childNodeName) {
-    for (Module module : context.getModules()) {
-      if (module.getName().equals(moduleName)) {
-        DataSchemaNode found =
-            findChildNode(module.getChildNodes(), childNodeName);
-        Preconditions.checkState(found != null, "Unable to find %s",
-            childNodeName);
-        return found;
-      }
-    }
-    throw new IllegalStateException("Unable to find child node "
-        + childNodeName);
-  }
-
-  static DataSchemaNode findChildNode(
-      final Collection<DataSchemaNode> children, final String name) {
-    List<DataNodeContainer> containers = Lists.newArrayList();
-
-    for (DataSchemaNode dataSchemaNode : children) {
-      if (dataSchemaNode.getQName().getLocalName().equals(name)) {
-        return dataSchemaNode;
-      }
-      if (dataSchemaNode instanceof DataNodeContainer) {
-        containers.add((DataNodeContainer) dataSchemaNode);
-      } else if (dataSchemaNode instanceof ChoiceNode) {
-        containers.addAll(((ChoiceNode) dataSchemaNode).getCases());
-      }
-    }
-
-    for (DataNodeContainer container : containers) {
-      DataSchemaNode retVal = findChildNode(container.getChildNodes(), name);
-      if (retVal != null) {
-        return retVal;
-      }
-    }
-
-    return null;
-  }
-
-  public static YangInstanceIdentifier.NodeIdentifier getNodeIdentifier(
-      final String localName) {
-    return new YangInstanceIdentifier.NodeIdentifier(QName.create(
-        URI.create(NAMESPACE), revision, localName));
-  }
-
-  public static YangInstanceIdentifier.AugmentationIdentifier getAugmentIdentifier(
-      final String... childNames) {
-    Set<QName> qn = Sets.newHashSet();
-
-    for (String childName : childNames) {
-      qn.add(getNodeIdentifier(childName).getNodeType());
-    }
-
-    return new YangInstanceIdentifier.AugmentationIdentifier(qn);
-  }
-
-
-  public static ContainerNode augmentChoiceExpectedNode() {
-
-    DataContainerNodeBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> b =
-        Builders.containerBuilder();
-    b.withNodeIdentifier(getNodeIdentifier("container"));
-
-    b.withChild(Builders
-        .choiceBuilder()
-        .withNodeIdentifier(getNodeIdentifier("ch2"))
-        .withChild(
-            Builders.leafBuilder()
-                .withNodeIdentifier(getNodeIdentifier("c2Leaf")).withValue("2")
-                .build())
-        .withChild(
-            Builders
-                .choiceBuilder()
-                .withNodeIdentifier(getNodeIdentifier("c2DeepChoice"))
-                .withChild(
-                    Builders
-                        .leafBuilder()
-                        .withNodeIdentifier(
-                            getNodeIdentifier("c2DeepChoiceCase1Leaf2"))
-                        .withValue("2").build()).build()).build());
-
-    b.withChild(Builders
-        .choiceBuilder()
-        .withNodeIdentifier(getNodeIdentifier("ch3"))
-        .withChild(
-            Builders.leafBuilder()
-                .withNodeIdentifier(getNodeIdentifier("c3Leaf")).withValue("3")
-                .build()).build());
-
-    b.withChild(Builders
-        .augmentationBuilder()
-        .withNodeIdentifier(getAugmentIdentifier("augLeaf"))
-        .withChild(
-            Builders.leafBuilder()
-                .withNodeIdentifier(getNodeIdentifier("augLeaf"))
-                .withValue("augment").build()).build());
-
-    b.withChild(Builders
-        .augmentationBuilder()
-        .withNodeIdentifier(getAugmentIdentifier("ch"))
-        .withChild(
-            Builders
-                .choiceBuilder()
-                .withNodeIdentifier(getNodeIdentifier("ch"))
-                .withChild(
-                    Builders.leafBuilder()
-                        .withNodeIdentifier(getNodeIdentifier("c1Leaf"))
-                        .withValue("1").build())
-                .withChild(
-                    Builders
-                        .augmentationBuilder()
-                        .withNodeIdentifier(
-                            getAugmentIdentifier("c1Leaf_AnotherAugment",
-                                "deepChoice"))
-                        .withChild(
-                            Builders
-                                .leafBuilder()
-                                .withNodeIdentifier(
-                                    getNodeIdentifier("c1Leaf_AnotherAugment"))
-                                .withValue("1").build())
-                        .withChild(
-                            Builders
-                                .choiceBuilder()
-                                .withNodeIdentifier(
-                                    getNodeIdentifier("deepChoice"))
-                                .withChild(
-                                    Builders
-                                        .leafBuilder()
-                                        .withNodeIdentifier(
-                                            getNodeIdentifier("deepLeafc1"))
-                                        .withValue("1").build()).build())
-                        .build()).build()).build());
-
-    return b.build();
-  }
-
-
-
-  public void init(final String yangPath, final String xmlPath,
-      final ContainerNode expectedNode) throws Exception {
-    SchemaContext schema = parseTestSchema(yangPath);
-    this.xmlPath = xmlPath;
-    this.containerNode =
-        (ContainerSchemaNode) getSchemaNode(schema, "test", "container");
-    this.expectedNode = expectedNode;
-  }
-
-  SchemaContext parseTestSchema(final String yangPath) throws Exception {
-
-    YangParserImpl yangParserImpl = new YangParserImpl();
-    InputStream stream =
-        NormalizedNodeXmlConverterTest.class.getResourceAsStream(yangPath);
-    ArrayList<InputStream> al = new ArrayList<InputStream>();
-    al.add(stream);
-    Set<Module> modules = yangParserImpl.parseYangModelsFromStreams(al);
-    return yangParserImpl.resolveSchemaContext(modules);
-
-  }
-
-
-  @Test
-  public void testConversionWithAugmentChoice() throws Exception {
-    init("/augment_choice.yang", "/augment_choice.xml",
-        augmentChoiceExpectedNode());
-    Document doc = loadDocument(xmlPath);
-
-    ContainerNode built =
-        DomToNormalizedNodeParserFactory
-            .getInstance(DomUtils.defaultValueCodecProvider())
-            .getContainerNodeParser()
-            .parse(Collections.singletonList(doc.getDocumentElement()),
-                containerNode);
-
-    if (expectedNode != null) {
-      junit.framework.Assert.assertEquals(expectedNode, built);
-    }
-
-    logger.info("{}", built);
-
-    Iterable<Element> els =
-        DomFromNormalizedNodeSerializerFactory
-            .getInstance(XmlDocumentUtils.getDocument(),
-                DomUtils.defaultValueCodecProvider())
-            .getContainerNodeSerializer().serialize(containerNode, built);
-
-    Element el = els.iterator().next();
-
-    XMLUnit.setIgnoreWhitespace(true);
-    XMLUnit.setIgnoreComments(true);
-
-    System.out.println(toString(doc.getDocumentElement()));
-    System.out.println(toString(el));
-
-    new Diff(XMLUnit.buildControlDocument(toString(doc.getDocumentElement())),
-        XMLUnit.buildTestDocument(toString(el))).similar();
-  }
-
-  private static ContainerNode listLeafListWithAttributes() {
-    DataContainerNodeBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> b =
-        Builders.containerBuilder();
-    b.withNodeIdentifier(getNodeIdentifier("container"));
-
-    CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder =
-        Builders.mapBuilder().withNodeIdentifier(getNodeIdentifier("list"));
-
-    Map<QName, Object> predicates = Maps.newHashMap();
-    predicates.put(getNodeIdentifier("uint32InList").getNodeType(), 3L);
-
-    DataContainerNodeBuilder<YangInstanceIdentifier.NodeIdentifierWithPredicates, MapEntryNode> list1Builder =
-        Builders.mapEntryBuilder().withNodeIdentifier(
-            new YangInstanceIdentifier.NodeIdentifierWithPredicates(
-                getNodeIdentifier("list").getNodeType(), predicates));
-    NormalizedNodeBuilder<YangInstanceIdentifier.NodeIdentifier, Object, LeafNode<Object>> uint32InListBuilder =
-        Builders.leafBuilder().withNodeIdentifier(
-            getNodeIdentifier("uint32InList"));
-
-    list1Builder.withChild(uint32InListBuilder.withValue(3L).build());
-
-    listBuilder.withChild(list1Builder.build());
-    b.withChild(listBuilder.build());
-
-    NormalizedNodeBuilder<YangInstanceIdentifier.NodeIdentifier, Object, LeafNode<Object>> booleanBuilder =
-        Builders.leafBuilder().withNodeIdentifier(getNodeIdentifier("boolean"));
-    booleanBuilder.withValue(false);
-    b.withChild(booleanBuilder.build());
-
-    ListNodeBuilder<Object, LeafSetEntryNode<Object>> leafListBuilder =
-        Builders.leafSetBuilder().withNodeIdentifier(
-            getNodeIdentifier("leafList"));
-
-    NormalizedNodeBuilder<YangInstanceIdentifier.NodeWithValue, Object, LeafSetEntryNode<Object>> leafList1Builder =
-        Builders.leafSetEntryBuilder().withNodeIdentifier(
-            new YangInstanceIdentifier.NodeWithValue(getNodeIdentifier(
-                "leafList").getNodeType(), "a"));
-
-    leafList1Builder.withValue("a");
-
-    leafListBuilder.withChild(leafList1Builder.build());
-    b.withChild(leafListBuilder.build());
-
-    return b.build();
-  }
-
-
-  @Test
-  public void testConversionWithAttributes() throws Exception {
-    init("/test.yang", "/simple_xml_with_attributes.xml",
-        listLeafListWithAttributes());
-    Document doc = loadDocument(xmlPath);
-
-    ContainerNode built =
-        DomToNormalizedNodeParserFactory
-            .getInstance(DomUtils.defaultValueCodecProvider())
-            .getContainerNodeParser()
-            .parse(Collections.singletonList(doc.getDocumentElement()),
-                containerNode);
-
-    if (expectedNode != null) {
-      junit.framework.Assert.assertEquals(expectedNode, built);
-    }
-
-    logger.info("{}", built);
-
-    Iterable<Element> els =
-        DomFromNormalizedNodeSerializerFactory
-            .getInstance(XmlDocumentUtils.getDocument(),
-                DomUtils.defaultValueCodecProvider())
-            .getContainerNodeSerializer().serialize(containerNode, built);
-
-    Element el = els.iterator().next();
-
-    XMLUnit.setIgnoreWhitespace(true);
-    XMLUnit.setIgnoreComments(true);
-
-    System.out.println(toString(doc.getDocumentElement()));
-    System.out.println(toString(el));
-
-    new Diff(XMLUnit.buildControlDocument(toString(doc.getDocumentElement())),
-        XMLUnit.buildTestDocument(toString(el))).similar();
-  }
-
-
-  private Document loadDocument(final String xmlPath) throws Exception {
-    InputStream resourceAsStream =
-        NormalizedNodeXmlConverterTest.class.getResourceAsStream(xmlPath);
-
-    Document currentConfigElement = readXmlToDocument(resourceAsStream);
-    Preconditions.checkNotNull(currentConfigElement);
-    return currentConfigElement;
-  }
-
-  private static final DocumentBuilderFactory BUILDERFACTORY;
-
-  static {
-    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-    factory.setNamespaceAware(true);
-    factory.setCoalescing(true);
-    factory.setIgnoringElementContentWhitespace(true);
-    factory.setIgnoringComments(true);
-    BUILDERFACTORY = factory;
-  }
-
-  private Document readXmlToDocument(final InputStream xmlContent)
-      throws IOException, SAXException {
-    DocumentBuilder dBuilder;
-    try {
-      dBuilder = BUILDERFACTORY.newDocumentBuilder();
-    } catch (ParserConfigurationException e) {
-      throw new RuntimeException("Failed to parse XML document", e);
-    }
-    Document doc = dBuilder.parse(xmlContent);
-
-    doc.getDocumentElement().normalize();
-    return doc;
-  }
-
-  public static String toString(final Element xml) {
-    try {
-      Transformer transformer =
-          TransformerFactory.newInstance().newTransformer();
-      transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
-      StreamResult result = new StreamResult(new StringWriter());
-      DOMSource source = new DOMSource(xml);
-      transformer.transform(source, result);
-
-      return result.getWriter().toString();
-    } catch (IllegalArgumentException | TransformerFactoryConfigurationError
-        | TransformerException e) {
-      throw new RuntimeException("Unable to serialize xml element " + xml, e);
-    }
-  }
-
-  @Test
-  public void testConversionToNormalizedXml() throws Exception {
-    SimpleNormalizedNodeMessage.NormalizedNodeXml nnXml =
-        EncoderDecoderUtil.encode(parseTestSchema("/augment_choice.yang"),
-            augmentChoiceExpectedNode());
-    Document expectedDoc = loadDocument("/augment_choice.xml");
-    Document convertedDoc =
-        EncoderDecoderUtil.factory.newDocumentBuilder().parse(
-            new ByteArrayInputStream(nnXml.getXmlString().getBytes("utf-8")));
-    System.out.println(toString(convertedDoc.getDocumentElement()));
-    XMLUnit.setIgnoreWhitespace(true);
-    XMLUnit.setIgnoreComments(true);
-    new Diff(XMLUnit.buildControlDocument(toString(expectedDoc
-        .getDocumentElement())),
-        XMLUnit.buildTestDocument(toString(convertedDoc.getDocumentElement())))
-        .similar();
-    System.out.println(toString(expectedDoc.getDocumentElement()));
-
-  }
-
-
-  @Test
-  public void testConversionFromXmlToNormalizedNode() throws Exception {
-    SimpleNormalizedNodeMessage.NormalizedNodeXml nnXml =
-        EncoderDecoderUtil.encode(parseTestSchema("/test.yang"),
-            listLeafListWithAttributes());
-    Document expectedDoc = loadDocument("/simple_xml_with_attributes.xml");
-    Document convertedDoc =
-        EncoderDecoderUtil.factory.newDocumentBuilder().parse(
-            new ByteArrayInputStream(nnXml.getXmlString().getBytes("utf-8")));
-    System.out.println(toString(convertedDoc.getDocumentElement()));
-    XMLUnit.setIgnoreWhitespace(true);
-    XMLUnit.setIgnoreComments(true);
-    new Diff(XMLUnit.buildControlDocument(toString(expectedDoc
-        .getDocumentElement())),
-        XMLUnit.buildTestDocument(toString(convertedDoc.getDocumentElement())))
-        .similar();
-    System.out.println(toString(expectedDoc.getDocumentElement()));
-
-    // now we will try to convert xml back to normalize node.
-    ContainerNode cn =
-        (ContainerNode) EncoderDecoderUtil.decode(
-            parseTestSchema("/test.yang"), nnXml);
-    junit.framework.Assert.assertEquals(listLeafListWithAttributes(), cn);
-
-  }
-
-}
index 8910137..857510a 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.base.Preconditions;
 public class ClusterWrapperImpl implements ClusterWrapper {
     private final Cluster cluster;
     private final String currentMemberName;
+    private final String selfAddress;
 
     public ClusterWrapperImpl(ActorSystem actorSystem){
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
@@ -31,6 +32,7 @@ public class ClusterWrapperImpl implements ClusterWrapper {
         );
 
         currentMemberName = (String) cluster.getSelfRoles().toArray()[0];
+        selfAddress = cluster.selfAddress().toString();
 
     }
 
@@ -45,4 +47,8 @@ public class ClusterWrapperImpl implements ClusterWrapper {
     public String getCurrentMemberName() {
         return currentMemberName;
     }
+
+    public String getSelfAddress() {
+        return selfAddress;
+    }
 }
index a498826..9a77e4d 100644 (file)
@@ -19,8 +19,12 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DataChangeListener extends AbstractUntypedActor {
+    private static final Logger LOG = LoggerFactory.getLogger(DataChangeListener.class);
+
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
     private boolean notificationsEnabled = false;
 
@@ -29,7 +33,8 @@ public class DataChangeListener extends AbstractUntypedActor {
         this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
     }
 
-    @Override public void handleReceive(Object message) throws Exception {
+    @Override
+    public void handleReceive(Object message) throws Exception {
         if(message instanceof DataChanged){
             dataChanged(message);
         } else if(message instanceof EnableNotification){
@@ -39,18 +44,24 @@ public class DataChangeListener extends AbstractUntypedActor {
 
     private void enableNotification(EnableNotification message) {
         notificationsEnabled = message.isEnabled();
+        LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"),
+                listener);
     }
 
     private void dataChanged(Object message) {
 
         // Do nothing if notifications are not enabled
-        if(!notificationsEnabled){
+        if(!notificationsEnabled) {
+            LOG.debug("Notifications not enabled for listener {} - dropping change notification",
+                    listener);
             return;
         }
 
         DataChanged reply = (DataChanged) message;
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
-            change = reply.getChange();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = reply.getChange();
+
+        LOG.debug("Sending change notification {} to listener {}", change, listener);
+
         this.listener.onDataChanged(change);
 
         // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
index acf630e..b2ae060 100644 (file)
@@ -8,14 +8,27 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.concurrent.TimeUnit;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import scala.concurrent.Future;
 
 /**
  * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
@@ -24,25 +37,36 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
  * </p>
  */
+@SuppressWarnings("rawtypes")
 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
+
+    public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
+
     private volatile ActorSelection listenerRegistrationActor;
-    private final AsyncDataChangeListener listener;
-    private final ActorRef dataChangeListenerActor;
+    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+    private ActorRef dataChangeListenerActor;
+    private final String shardName;
+    private final ActorContext actorContext;
     private boolean closed = false;
 
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-    DataChangeListenerRegistrationProxy(
-        ActorSelection listenerRegistrationActor,
-        L listener, ActorRef dataChangeListenerActor) {
-        this.listenerRegistrationActor = listenerRegistrationActor;
+                                                              DataChangeListenerRegistrationProxy (
+            String shardName, ActorContext actorContext, L listener) {
+        this.shardName = shardName;
+        this.actorContext = actorContext;
         this.listener = listener;
-        this.dataChangeListenerActor = dataChangeListenerActor;
     }
 
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-    DataChangeListenerRegistrationProxy(
-        L listener, ActorRef dataChangeListenerActor) {
-        this(null, listener, dataChangeListenerActor);
+    @VisibleForTesting
+    ActorSelection getListenerRegistrationActor() {
+        return listenerRegistrationActor;
+    }
+
+    @VisibleForTesting
+    ActorRef getDataChangeListenerActor() {
+        return dataChangeListenerActor;
     }
 
     @Override
@@ -50,7 +74,11 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         return listener;
     }
 
-    public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+    private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+        if(listenerRegistrationActor == null) {
+            return;
+        }
+
         boolean sendCloseMessage = false;
         synchronized(this) {
             if(closed) {
@@ -59,16 +87,55 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                 this.listenerRegistrationActor = listenerRegistrationActor;
             }
         }
+
         if(sendCloseMessage) {
             listenerRegistrationActor.tell(new
                 CloseDataChangeListenerRegistration().toSerializable(), null);
         }
+    }
 
-        this.listenerRegistrationActor = listenerRegistrationActor;
+    public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
+
+        dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+                DataChangeListener.props(listener));
+
+        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
+        findFuture.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(Throwable failure, ActorRef shard) {
+                if(failure instanceof LocalShardNotFoundException) {
+                    LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
+                            "cannot be registered", shardName, listener, path);
+                } else if(failure != null) {
+                    LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
+                            "cannot be registered: {}", shardName, listener, path, failure);
+                } else {
+                    doRegistration(shard, path, scope);
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
     }
 
-    public ActorSelection getListenerRegistrationActor() {
-        return listenerRegistrationActor;
+    private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
+            DataChangeScope scope) {
+
+        Future<Object> future = actorContext.executeOperationAsync(shard,
+                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+                REGISTER_TIMEOUT);
+
+        future.onComplete(new OnComplete<Object>(){
+            @Override
+            public void onComplete(Throwable failure, Object result) {
+                if(failure != null) {
+                    LOG.error("Failed to register DataChangeListener {} at path {}",
+                            listener, path.toString(), failure);
+                } else {
+                    RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+                    setListenerRegistrationActor(actorContext.actorSelection(
+                            reply.getListenerRegistrationPath()));
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
     }
 
     @Override
@@ -79,11 +146,16 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
             sendCloseMessage = !closed && listenerRegistrationActor != null;
             closed = true;
         }
+
         if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new
-                CloseDataChangeListenerRegistration().toSerializable(), null);
+            listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(),
+                    ActorRef.noSender());
+            listenerRegistrationActor = null;
         }
 
-        dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
+        if(dataChangeListenerActor != null) {
+            dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            dataChangeListenerActor = null;
+        }
     }
 }
index f6c31aa..2c73807 100644 (file)
@@ -8,16 +8,10 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -34,7 +28,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
 
 /**
  *
@@ -83,39 +76,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
-        Optional<ActorRef> shard = actorContext.findLocalShard(shardName);
-
-        //if shard is NOT local
-        if (!shard.isPresent()) {
-            LOG.debug("No local shard for shardName {} was found so returning a noop registration", shardName);
-            return new NoOpDataChangeListenerRegistration(listener);
-        }
-        //if shard is local
-        ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props(listener));
-        Future future = actorContext.executeOperationAsync(shard.get(),
-                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
-                new Timeout(actorContext.getOperationDuration().$times(REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));
-
         final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
-                new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor);
-
-        future.onComplete(new OnComplete() {
-
-            @Override
-            public void onComplete(Throwable failure, Object result)
-                    throws Throwable {
-                if (failure != null) {
-                    LOG.error("Failed to register listener at path " + path.toString(), failure);
-                    return;
-                }
-                RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
-                listenerRegistrationProxy.setListenerRegistrationActor(actorContext
-                        .actorSelection(reply.getListenerRegistrationPath()));
-            }
-        }, actorContext.getActorSystem().dispatcher());
+                new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
+        listenerRegistrationProxy.init(path, scope);
 
         return listenerRegistrationProxy;
-
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpCohort.java
deleted file mode 100644 (file)
index eb28159..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.UntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-
-public class NoOpCohort extends UntypedActor {
-
-    @Override public void onReceive(Object message) throws Exception {
-        if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(new CanCommitTransactionReply(false).toSerializable(), getSelf());
-        } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(
-                new PreCommitTransactionReply().toSerializable(),
-                getSelf());
-        } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(new CommitTransactionReply().toSerializable(), getSelf());
-        } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(new AbortTransactionReply().toSerializable(), getSelf());
-        } else {
-            throw new Exception ("Not recognized message received,message="+message);
-        }
-
-    }
-}
-
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java
deleted file mode 100644 (file)
index 14af31e..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * When a consumer registers a data change listener and no local shard is
- * available to register that listener with then we return an instance of
- * NoOpDataChangeListenerRegistration
- *
- * <p>
- *
- * The NoOpDataChangeListenerRegistration as it's name suggests does
- * nothing when an operation is invoked on it
- */
-public class NoOpDataChangeListenerRegistration
-    implements ListenerRegistration {
-
-    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
-        listener;
-
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> NoOpDataChangeListenerRegistration(
-        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
-
-        this.listener = listener;
-    }
-
-    @Override
-    public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
-        return listener;
-    }
-
-    @Override public void close() {
-
-    }
-}
index fef7e22..789d51a 100644 (file)
@@ -34,9 +34,9 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@ -74,14 +74,14 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-import java.util.ArrayList;
+
+import javax.annotation.Nonnull;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -112,7 +112,10 @@ public class Shard extends RaftActor {
 
     private final ShardStats shardMBean;
 
-    private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+    private final List<ActorSelection> dataChangeListeners =  Lists.newArrayList();
+
+    private final List<DelayedListenerRegistration> delayedListenerRegistrations =
+                                                                       Lists.newArrayList();
 
     private final DatastoreContext datastoreContext;
 
@@ -216,6 +219,10 @@ public class Shard extends RaftActor {
 
         if (message instanceof RecoveryFailure){
             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+
+            // Even though recovery failed, we still need to finish our recovery, eg send the
+            // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
+            onRecoveryComplete();
         } else {
             super.onReceiveRecover(message);
         }
@@ -383,8 +390,11 @@ public class Shard extends RaftActor {
                 ready.getModification());
 
         // Return our actor path as we'll handle the three phase commit.
-        getSender().tell(new ReadyTransactionReply(Serialization.serializedActorPath(self())).
-                toSerializable(), getSelf());
+        ReadyTransactionReply readyTransactionReply =
+            new ReadyTransactionReply(Serialization.serializedActorPath(self()));
+        getSender().tell(
+            ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
+            getSelf());
     }
 
     private void handleAbortTransaction(AbortTransaction abort) {
@@ -568,53 +578,60 @@ public class Shard extends RaftActor {
         store.onGlobalContextUpdated(message.getSchemaContext());
     }
 
-    @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
+    @VisibleForTesting
+    void updateSchemaContext(SchemaContext schemaContext) {
         store.onGlobalContextUpdated(schemaContext);
     }
 
-    private void registerChangeListener(
-        RegisterChangeListener registerChangeListener) {
+    private void registerChangeListener(RegisterChangeListener registerChangeListener) {
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("registerDataChangeListener for {}", registerChangeListener
-                .getPath());
+        LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+
+        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                                     NormalizedNode<?, ?>>> registration;
+        if(isLeader()) {
+            registration = doChangeListenerRegistration(registerChangeListener);
+        } else {
+            LOG.debug("Shard is not the leader - delaying registration");
+
+            DelayedListenerRegistration delayedReg =
+                    new DelayedListenerRegistration(registerChangeListener);
+            delayedListenerRegistrations.add(delayedReg);
+            registration = delayedReg;
         }
 
+        ActorRef listenerRegistration = getContext().actorOf(
+                DataChangeListenerRegistration.props(registration));
+
+        LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+                    listenerRegistration.path());
 
-        ActorSelection dataChangeListenerPath = getContext()
-            .system().actorSelection(
-                registerChangeListener.getDataChangeListenerPath());
+        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+    }
 
+    private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                               NormalizedNode<?, ?>>> doChangeListenerRegistration(
+            RegisterChangeListener registerChangeListener) {
+
+        ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
+                registerChangeListener.getDataChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
         // If this shard is the leader then it will enable notifications else
         // it will not
-        dataChangeListenerPath
-            .tell(new EnableNotification(isLeader()), getSelf());
+        dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
 
         // Now store a reference to the data change listener so it can be notified
         // at a later point if notifications should be enabled or disabled
         dataChangeListeners.add(dataChangeListenerPath);
 
-        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
-            listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+                new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
 
-        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            registration = store.registerChangeListener(registerChangeListener.getPath(),
-                listener, registerChangeListener.getScope());
-        ActorRef listenerRegistration =
-            getContext().actorOf(
-                DataChangeListenerRegistration.props(registration));
+        LOG.debug("Registering for path {}", registerChangeListener.getPath());
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug(
-                "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
-                , listenerRegistration.path().toString());
-        }
-
-        getSender()
-            .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
-                getSelf());
+        return store.registerChangeListener(registerChangeListener.getPath(), listener,
+                registerChangeListener.getScope());
     }
 
     private boolean isMetricsCaptureEnabled(){
@@ -695,12 +712,15 @@ public class Shard extends RaftActor {
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
-        // Schedule a message to be periodically sent to check if the current in-progress
-        // transaction should be expired and aborted.
-        FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
-        txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
-                period, period, getSelf(),
-                TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+        // Being paranoid here - this method should only be called once but just in case...
+        if(txCommitTimeoutCheckSchedule == null) {
+            // Schedule a message to be periodically sent to check if the current in-progress
+            // transaction should be expired and aborted.
+            FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+            txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
+                    period, period, getSelf(),
+                    TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+        }
     }
 
     @Override
@@ -787,17 +807,28 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Override protected void onStateChanged() {
+    @Override
+    protected void onStateChanged() {
+        boolean isLeader = isLeader();
         for (ActorSelection dataChangeListener : dataChangeListeners) {
-            dataChangeListener
-                .tell(new EnableNotification(isLeader()), getSelf());
+            dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
+        }
+
+        if(isLeader) {
+            for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
+                if(!reg.isClosed()) {
+                    reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
+                }
+            }
+
+            delayedListenerRegistrations.clear();
         }
 
         shardMBean.setRaftState(getRaftState().name());
         shardMBean.setCurrentTerm(getCurrentTerm());
 
         // If this actor is no longer the leader close all the transaction chains
-        if(!isLeader()){
+        if(!isLeader){
             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
                 if(LOG.isDebugEnabled()) {
                     LOG.debug(
@@ -851,4 +882,45 @@ public class Shard extends RaftActor {
     ShardStats getShardMBean() {
         return shardMBean;
     }
+
+    private static class DelayedListenerRegistration implements
+        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
+
+        private volatile boolean closed;
+
+        private final RegisterChangeListener registerChangeListener;
+
+        private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                                             NormalizedNode<?, ?>>> delegate;
+
+        DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
+            this.registerChangeListener = registerChangeListener;
+        }
+
+        void setDelegate( ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                            NormalizedNode<?, ?>>> registration) {
+            this.delegate = registration;
+        }
+
+        boolean isClosed() {
+            return closed;
+        }
+
+        RegisterChangeListener getRegisterChangeListener() {
+            return registerChangeListener;
+        }
+
+        @Override
+        public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+            return delegate != null ? delegate.getInstance() : null;
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+            if(delegate != null) {
+                delegate.close();
+            }
+        }
+    }
 }
index 157f1cb..e861165 100644 (file)
@@ -25,6 +25,7 @@ import akka.persistence.RecoveryFailure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
@@ -163,7 +164,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("Initializing shard [{}]", shardName);
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
-            shardInformation.setShardInitialized(true);
+            shardInformation.setActorInitialized();
         }
     }
 
@@ -192,7 +193,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        sendResponse(shardInformation, new Supplier<Object>() {
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
             @Override
             public Object get() {
                 return new LocalShardFound(shardInformation.getActor());
@@ -200,9 +201,22 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {