BUG-832 Refactor netconf connector 93/6793/13
authorMaros Marsalek <mmarsale@cisco.com>
Tue, 29 Apr 2014 12:23:52 +0000 (14:23 +0200)
committerMaros Marsalek <mmarsale@cisco.com>
Mon, 2 Jun 2014 08:18:53 +0000 (08:18 +0000)
- Extract general API for remote connectors
- Split NetconfDevice into more classes
- Make data operations over netconf device in inventory binding aware (fixes 969)
- Add dependency on binding-broker
- Make connector crash if schema is not present for device
- Improve logging

Change-Id: Idf4fa4994b3ac067bd69e9cd629b6d8c225d7b77
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
39 files changed:
opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/99-netconf-connector.xml
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/DataReaderRouter.java
opendaylight/md-sal/sal-netconf-connector/pom.xml
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/MessageTransformer.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDevice.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceCommunicator.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceHandler.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/SchemaContextProviderFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/SchemaSourceProviderFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/package-info.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/InventoryUtils.java [deleted file]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java [deleted file]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java [deleted file]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfInventoryUtils.java [deleted file]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java [deleted file]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilities.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/UncancellableFuture.java [moved from opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/UncancellableFuture.java with 67% similarity]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/package-info.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceCommitHandler.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDataReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDatastoreAdapter.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceRpc.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTwoPhaseCommitTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/NetconfDeviceSchemaProviderFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/NetconfRemoteSchemaSourceProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/util/NetconfMessageTransformUtil.java [moved from opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.java with 50% similarity]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/FailedRpcResult.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/MessageCounter.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/RemoteDeviceId.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/package-info.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/yang/odl-sal-netconf-connector-cfg.yang
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/test/resources/schemas/test-module.yang [new file with mode: 0644]

index fcbec19..b4b433d 100644 (file)
             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netty">prefix:netty-timer</type>
             <name>global-timer</name>
           </timer>
+        </module>
+
+        <!-- Netconf dispatcher to be used by all netconf-connectors --> 
+        <module>
+          <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
+          <name>global-netconf-processing-executor-threadfactory</name>
+          <name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
+        </module>  
+        <module>
+          <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">prefix:threadpool-flexible</type>
+          <name>global-netconf-processing-executor</name>
+          <minThreadCount xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">1</minThreadCount>
+          <max-thread-count xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">4</max-thread-count>
+          <keepAliveMillis xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">600000</keepAliveMillis>
+          <threadFactory xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">
+            <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadfactory</type>
+            <name>global-netconf-processing-executor-threadfactory</name>
+          </threadFactory>
         </module>  
 
         <!-- Loopback connection to netconf server in controller using netconf-connector -->
             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netty">prefix:netty-event-executor</type>
             <name>global-event-executor</name>
           </event-executor>
+          <binding-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+            <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">prefix:binding-broker-osgi-registry</type>
+            <name>binding-osgi-broker</name>
+          </binding-registry>
           <dom-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
             <name>dom-broker</name>
             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf">prefix:netconf-client-dispatcher</type>
             <name>global-netconf-dispatcher</name>
           </client-dispatcher>
+          <processing-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+            <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadpool</type>
+            <name>global-netconf-processing-executor</name>
+          </processing-executor>
         </module>
       </modules>
 
             <provider>/modules/module[type='netconf-client-dispatcher'][name='global-netconf-dispatcher']</provider>
           </instance>
         </service>
+        <service>
+          <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadfactory</type>
+          <instance>
+            <name>global-netconf-processing-executor-threadfactory</name>
+            <provider>/modules/module[type='threadfactory-naming'][name='global-netconf-processing-executor-threadfactory']</provider>
+          </instance>
+        </service>
+        <service>
+          <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadpool</type>
+          <instance>
+            <name>global-netconf-processing-executor</name>
+            <provider>/modules/module[type='threadpool-flexible'][name='global-netconf-processing-executor']</provider>
+          </instance>
+        </service>
       </services>
 
     </data>
   <required-capabilities>
       <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf?module=odl-sal-netconf-connector-cfg&amp;revision=2013-10-28</capability>
       <capability>urn:opendaylight:params:xml:ns:yang:controller:config:netconf:client:dispatcher?module=odl-netconfig-client-cfg&amp;revision=2014-04-08</capability>
+      <capability>urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl?module=threadpool-impl&amp;revision=2013-04-05</capability>
+      <capability>urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible?module=threadpool-impl-flexible&amp;revision=2013-12-01</capability>
   </required-capabilities>
 </snapshot>
index 53423f6..ba9b2b7 100644 (file)
@@ -103,7 +103,7 @@ AbstractDataReadRouter<InstanceIdentifier, CompositeNode> {
                         "Only one simple node for key $s is allowed in node $s",
                         keyValue.getKey(), node);
                 checkState(
-                        simpleNode.get(0).getValue() == keyValue.getValue(),
+                        simpleNode.get(0).getValue().equals(keyValue.getValue()),
                         "Key node must equal to instance identifier value in node $s",
                         node);
                 ret.put(keyValue.getKey(), simpleNode.get(0));
index 9ad95c1..0dd2529 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-binding-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-binding-config</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>threadpool-config-api</artifactId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>logback-config</artifactId>
       <type>jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>mockito-configuration</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index 1839de1..f73d9cc 100644 (file)
@@ -10,38 +10,29 @@ package org.opendaylight.controller.config.yang.md.sal.connector.netconf;
 import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkCondition;
 import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkNotNull;
 
-import com.google.common.net.InetAddresses;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.concurrent.GlobalEventExecutor;
 import java.io.File;
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
-import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
 import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
 import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
-import org.opendaylight.controller.sal.connect.netconf.InventoryUtils;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.controller.sal.connect.netconf.NetconfDeviceListener;
-import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceSalFacade;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
 import org.opendaylight.yangtools.yang.model.util.repo.FilesystemSchemaCachingProvider;
 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
@@ -51,6 +42,10 @@ import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.net.InetAddresses;
+import io.netty.util.HashedWheelTimer;
+
 /**
  *
  */
@@ -58,22 +53,20 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
 {
     private static final Logger logger = LoggerFactory.getLogger(NetconfConnectorModule.class);
 
-    private static ExecutorService GLOBAL_PROCESSING_EXECUTOR = null;
     private static AbstractCachingSchemaSourceProvider<String, InputStream> GLOBAL_NETCONF_SOURCE_PROVIDER = null;
     private BundleContext bundleContext;
 
-    public NetconfConnectorModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+    public NetconfConnectorModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
 
-    public NetconfConnectorModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, NetconfConnectorModule oldModule, java.lang.AutoCloseable oldInstance) {
+    public NetconfConnectorModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, final NetconfConnectorModule oldModule, final java.lang.AutoCloseable oldInstance) {
         super(identifier, dependencyResolver, oldModule, oldInstance);
     }
 
     @Override
     protected void customValidation() {
         checkNotNull(getAddress(), addressJmxAttribute);
-        //checkState(getAddress().getIpv4Address() != null || getAddress().getIpv6Address() != null,"Address must be set.");
         checkNotNull(getPort(), portJmxAttribute);
         checkNotNull(getDomRegistry(), portJmxAttribute);
         checkNotNull(getDomRegistry(), domRegistryJmxAttribute);
@@ -96,41 +89,83 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
             checkNotNull(getPassword(), passwordJmxAttribute);
         }
 
+        // FIXME BUG 944 remove this warning
+        if(getBindingRegistry() == null) {
+            logger.warn("Configuration property: \"binding-registry\" not set for sal-netconf-connector (" + getIdentifier() + "). " +
+                    "Netconf-connector now requires a dependency on \"binding-broker-osgi-registry\". " +
+                    "The dependency is optional for now to preserve backwards compatibility, but will be mandatory in the future. " +
+                    "Please set the property as in \"01-netconf-connector\" initial config file. " +
+                    "The service will be retrieved from OSGi service registry now.");
+        }
+
+        // FIXME BUG 944 remove this warning
+        if(getProcessingExecutor() == null) {
+            logger.warn("Configuration property: \"processing-executor\" not set for sal-netconf-connector (" + getIdentifier() + "). " +
+                    "Netconf-connector now requires a dependency on \"threadpool\". " +
+                    "The dependency is optional for now to preserve backwards compatibility, but will be mandatory in the future. " +
+                    "Please set the property as in \"01-netconf-connector\" initial config file. " +
+                    "New instance will be created for the executor.");
+        }
     }
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-        ServiceReference<DataProviderService> serviceReference = bundleContext.getServiceReference(DataProviderService.class);
+        final RemoteDeviceId id = new RemoteDeviceId(getIdentifier());
 
-        DataProviderService dataProviderService =
-                bundleContext.getService(serviceReference);
+        final ExecutorService globalProcessingExecutor = getGlobalProcessingExecutor();
 
-        getDomRegistryDependency();
-        NetconfDevice device = new NetconfDevice(getIdentifier().getInstanceName());
+        final Broker domBroker = getDomRegistryDependency();
+        final BindingAwareBroker bindingBroker = getBindingRegistryBackwards();
 
-        device.setClientConfig(getClientConfig(device));
+        final RemoteDeviceHandler salFacade = new NetconfDeviceSalFacade(id, domBroker, bindingBroker, bundleContext, globalProcessingExecutor);
+        final NetconfDevice device =
+                NetconfDevice.createNetconfDevice(id, getGlobalNetconfSchemaProvider(), globalProcessingExecutor, salFacade);
+        final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(id, device);
+        final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(listener);
 
-        device.setProcessingExecutor(getGlobalProcessingExecutor());
+        // FIXME BUG-944 remove backwards compatibility
+        final NetconfClientDispatcher dispatcher = getClientDispatcher() == null ? createDispatcher() : getClientDispatcherDependency();
+        listener.initializeRemoteConnection(dispatcher, clientConfig);
 
-        device.setEventExecutor(getEventExecutorDependency());
-        device.setDispatcher(getClientDispatcher() == null ? createDispatcher() : getClientDispatcherDependency());
-        device.setSchemaSourceProvider(getGlobalNetconfSchemaProvider(bundleContext));
-        device.setDataProviderService(dataProviderService);
-        getDomRegistryDependency().registerProvider(device, bundleContext);
-        device.start();
-        return device;
+        return new AutoCloseable() {
+            @Override
+            public void close() throws Exception {
+                listener.close();
+                salFacade.close();
+            }
+        };
+    }
+
+    private BindingAwareBroker getBindingRegistryBackwards() {
+        if(getBindingRegistry() != null) {
+            return getBindingRegistryDependency();
+        } else {
+            // FIXME BUG 944 remove backwards compatibility
+            final ServiceReference<BindingAwareBroker> serviceReference = bundleContext.getServiceReference(BindingAwareBroker.class);
+            Preconditions
+                    .checkNotNull(
+                            serviceReference,
+                            "Unable to retrieve %s from OSGi service registry, use binding-registry config property to inject %s with config subsystem",
+                            BindingAwareBroker.class, BindingAwareBroker.class);
+            return bundleContext.getService(serviceReference);
+        }
     }
 
     private ExecutorService getGlobalProcessingExecutor() {
-        return GLOBAL_PROCESSING_EXECUTOR == null ? Executors.newCachedThreadPool() : GLOBAL_PROCESSING_EXECUTOR;
+        if(getProcessingExecutor() != null) {
+            return getProcessingExecutorDependency().getExecutor();
+        } else {
+            // FIXME BUG 944 remove backwards compatibility
+            return Executors.newCachedThreadPool();
+        }
     }
 
-    private synchronized AbstractCachingSchemaSourceProvider<String, InputStream> getGlobalNetconfSchemaProvider(BundleContext bundleContext) {
+    private synchronized AbstractCachingSchemaSourceProvider<String, InputStream> getGlobalNetconfSchemaProvider() {
         if(GLOBAL_NETCONF_SOURCE_PROVIDER == null) {
-            String storageFile = "cache/schema";
+            final String storageFile = "cache/schema";
             //            File directory = bundleContext.getDataFile(storageFile);
-            File directory = new File(storageFile);
-            SchemaSourceProvider<String> defaultProvider = SchemaSourceProviders.noopProvider();
+            final File directory = new File(storageFile);
+            final SchemaSourceProvider<String> defaultProvider = SchemaSourceProviders.noopProvider();
             GLOBAL_NETCONF_SOURCE_PROVIDER = FilesystemSchemaCachingProvider.createFromStringSourceProvider(defaultProvider, directory);
         }
         return GLOBAL_NETCONF_SOURCE_PROVIDER;
@@ -146,20 +181,20 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
         return new NetconfClientDispatcherImpl(getBossThreadGroupDependency(), getWorkerThreadGroupDependency(), new HashedWheelTimer());
     }
 
-    public void setBundleContext(BundleContext bundleContext) {
+    public void setBundleContext(final BundleContext bundleContext) {
         this.bundleContext = bundleContext;
     }
 
-    public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDevice device) {
-        InetSocketAddress socketAddress = getSocketAddress();
-        ReconnectStrategy strategy = getReconnectStrategy();
-        long clientConnectionTimeoutMillis = getConnectionTimeoutMillis();
+    public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener) {
+        final InetSocketAddress socketAddress = getSocketAddress();
+        final ReconnectStrategy strategy = getReconnectStrategy();
+        final long clientConnectionTimeoutMillis = getConnectionTimeoutMillis();
 
         return NetconfReconnectingClientConfigurationBuilder.create()
         .withAddress(socketAddress)
         .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
         .withReconnectStrategy(strategy)
-        .withSessionListener(new NetconfDeviceListener(device))
+        .withSessionListener(listener)
         .withAuthHandler(new LoginPassword(getUsername(),getPassword()))
         .withProtocol(getTcpOnly() ?
                 NetconfClientConfiguration.NetconfClientProtocol.TCP :
@@ -174,19 +209,19 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
     }
 
     private ReconnectStrategy getReconnectStrategy() {
-        Long connectionAttempts;
+        final Long connectionAttempts;
         if (getMaxConnectionAttempts() != null && getMaxConnectionAttempts() > 0) {
             connectionAttempts = getMaxConnectionAttempts();
         } else {
             logger.trace("Setting {} on {} to infinity", maxConnectionAttemptsJmxAttribute, this);
             connectionAttempts = null;
         }
-        double sleepFactor = 1.5;
-        int minSleep = 1000;
-        Long maxSleep = null;
-        Long deadline = null;
+        final double sleepFactor = getSleepFactor().doubleValue();
+        final int minSleep = getBetweenAttemptsTimeoutMillis();
+        final Long maxSleep = null;
+        final Long deadline = null;
 
-        return new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, getBetweenAttemptsTimeoutMillis(),
+        return new TimedReconnectStrategy(getEventExecutorDependency(), getBetweenAttemptsTimeoutMillis(),
                 minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
     }
 
@@ -199,7 +234,7 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
             addressValue = getAddress().getIpv6Address().getValue();
         }
          */
-        InetAddress inetAddress = InetAddresses.forString(getAddress());
+        final InetAddress inetAddress = InetAddresses.forString(getAddress());
         return new InetSocketAddress(inetAddress, getPort().intValue());
     }
 }
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/MessageTransformer.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/MessageTransformer.java
new file mode 100644 (file)
index 0000000..7a392a8
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.api;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
+
+public interface MessageTransformer<M> extends SchemaContextListener {
+
+    CompositeNode toNotification(M message);
+
+    M toRpcRequest(QName rpc, CompositeNode node);
+
+    RpcResult<CompositeNode> toRpcResult(M message, QName rpc);
+
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDevice.java
new file mode 100644 (file)
index 0000000..e0d2433
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.api;
+
+/**
+ *
+ */
+public interface RemoteDevice<PREF, M> {
+
+    void onRemoteSessionUp(PREF remoteSessionCapabilities, RemoteDeviceCommunicator<M> listener);
+
+    void onRemoteSessionDown();
+
+    void onNotification(M notification);
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceCommunicator.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceCommunicator.java
new file mode 100644 (file)
index 0000000..67cb29a
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.api;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public interface RemoteDeviceCommunicator<M> extends AutoCloseable {
+
+    ListenableFuture<RpcResult<M>> sendRequest(M message, QName rpc);
+
+    void close();
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceHandler.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceHandler.java
new file mode 100644 (file)
index 0000000..b2845d5
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.api;
+
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+
+public interface RemoteDeviceHandler<PREF> extends AutoCloseable {
+
+    void onDeviceConnected(SchemaContextProvider remoteSchemaContextProvider,
+                           PREF netconfSessionPreferences, RpcImplementation deviceRpc);
+
+    void onDeviceDisconnected();
+
+    void onNotification(CompositeNode domNotification);
+
+    void close();
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/SchemaContextProviderFactory.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/SchemaContextProviderFactory.java
new file mode 100644 (file)
index 0000000..43577c3
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.api;
+
+import java.io.InputStream;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+
+public interface SchemaContextProviderFactory {
+
+    SchemaContextProvider createContextProvider(Collection<QName> capabilities, SchemaSourceProvider<InputStream> sourceProvider);
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/SchemaSourceProviderFactory.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/SchemaSourceProviderFactory.java
new file mode 100644 (file)
index 0000000..7037231
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.api;
+
+import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+
+public interface SchemaSourceProviderFactory<T> {
+
+    SchemaSourceProvider<T> createSourceProvider(final RpcImplementation deviceRpc);
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/package-info.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/package-info.java
new file mode 100644 (file)
index 0000000..022021d
--- /dev/null
@@ -0,0 +1,6 @@
+/**
+ * General API for remote connectors e.g. netconf connector
+ *
+ * TODO extract into separate bundle when another connector is implemented e.g. restconf connector
+ */
+package org.opendaylight.controller.sal.connect.api;
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/InventoryUtils.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/InventoryUtils.java
deleted file mode 100644 (file)
index e6dc59c..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf;
-
-import java.net.URI;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class InventoryUtils {
-    private static final Logger LOG = LoggerFactory.getLogger(InventoryUtils.class);
-    private static final URI INVENTORY_NAMESPACE = URI.create("urn:opendaylight:inventory");
-    private static final URI NETCONF_INVENTORY_NAMESPACE = URI.create("urn:opendaylight:netconf-node-inventory");
-    private static final Date INVENTORY_REVISION = dateFromString("2013-08-19");
-    private static final Date NETCONF_INVENTORY_REVISION = dateFromString("2014-01-08");
-    public static final QName INVENTORY_NODES = new QName(INVENTORY_NAMESPACE, INVENTORY_REVISION, "nodes");
-    public static final QName INVENTORY_NODE = new QName(INVENTORY_NAMESPACE, INVENTORY_REVISION, "node");
-    public static final QName INVENTORY_ID = new QName(INVENTORY_NAMESPACE, INVENTORY_REVISION, "id");
-    public static final QName INVENTORY_CONNECTED = new QName(NETCONF_INVENTORY_NAMESPACE, NETCONF_INVENTORY_REVISION,
-            "connected");
-    public static final QName NETCONF_INVENTORY_INITIAL_CAPABILITY = new QName(NETCONF_INVENTORY_NAMESPACE,
-            NETCONF_INVENTORY_REVISION, "initial-capability");
-
-    public static final InstanceIdentifier INVENTORY_PATH = InstanceIdentifier.builder().node(INVENTORY_NODES)
-            .toInstance();
-    public static final QName NETCONF_INVENTORY_MOUNT = null;
-
-    private InventoryUtils() {
-        throw new UnsupportedOperationException("Utility class cannot be instantiated");
-    }
-
-    /**
-     * Converts date in string format yyyy-MM-dd to java.util.Date.
-     *
-     * @return java.util.Date conformant to string formatted date yyyy-MM-dd.
-     */
-    private static Date dateFromString(final String date) {
-        // We do not reuse the formatter because it's not thread-safe
-        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
-        try {
-            return formatter.parse(date);
-        } catch (ParseException e) {
-            LOG.error("Failed to parse date {}", date, e);
-            return null;
-        }
-    }
-}
index 94beaed..dca8fca 100644 (file)
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
-import static com.google.common.base.Preconditions.checkState;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
-
-import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
-import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
-import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
-import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.sal.connect.api.MessageTransformer;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory;
+import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
+import org.opendaylight.controller.sal.connect.netconf.schema.NetconfDeviceSchemaProviderFactory;
+import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaSourceProvider;
+import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
-import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
-import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.util.concurrent.EventExecutor;
-
-public class NetconfDevice implements Provider, //
-        DataReader<InstanceIdentifier, CompositeNode>, //
-        DataCommitHandler<InstanceIdentifier, CompositeNode>, //
-        RpcImplementation, //
-        AutoCloseable {
-
-    InetSocketAddress socketAddress;
-
-    MountProvisionInstance mountInstance;
-
-    EventExecutor eventExecutor;
-
-    ExecutorService processingExecutor;
-
-    InstanceIdentifier path;
-
-    ReconnectStrategy reconnectStrategy;
-
-    AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
-
-    private NetconfDeviceSchemaContextProvider deviceContextProvider;
-
-    protected Logger logger;
-
-    Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
-    Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
-    Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
-    List<RpcRegistration> rpcReg;
-
-    String name;
-
-    MountProvisionService mountService;
-
-    NetconfClientDispatcher dispatcher;
-
-    static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
-
-    SchemaSourceProvider<InputStream> remoteSourceProvider;
-
-    private volatile DataBrokerService dataBroker;
-
-    NetconfDeviceListener listener;
-
-    private boolean rollbackSupported;
-
-    private NetconfReconnectingClientConfiguration clientConfig;
-    private volatile DataProviderService dataProviderService;
-
-    public NetconfDevice(String name) {
-        this.name = name;
-        this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
-        this.path = InstanceIdentifier.builder(INVENTORY_PATH)
-                .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
-    }
-
-    public void start() {
-        checkState(dispatcher != null, "Dispatcher must be set.");
-        checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
-        checkState(eventExecutor != null, "Event executor must be set.");
-
-        Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener);
-        listener = (NetconfDeviceListener) clientConfig.getSessionListener();
-
-        logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
-
-        dispatcher.createReconnectingClient(clientConfig);
-    }
-
-    Optional<SchemaContext> getSchemaContext() {
-        if (deviceContextProvider == null) {
-            return Optional.absent();
-        }
-        return deviceContextProvider.currentContext;
-    }
-
-    void bringDown() {
-        if (rpcReg != null) {
-            for (RpcRegistration reg : rpcReg) {
-                reg.close();
-            }
-            rpcReg = null;
-        }
-        closeGracefully(confReaderReg);
-        confReaderReg = null;
-        closeGracefully(operReaderReg);
-        operReaderReg = null;
-        closeGracefully(commitHandlerReg);
-        commitHandlerReg = null;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
-        updateDeviceState(false, Collections.<QName> emptySet());
-    }
+/**
+ *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
+ */
+public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilities, NetconfMessage> {
 
-    private void closeGracefully(final AutoCloseable resource) {
-        if (resource != null) {
-            try {
-                resource.close();
-            } catch (Exception e) {
-                logger.warn("Ignoring exception while closing {}", resource, e);
-            }
-        }
-    }
+    private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
 
-    void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
-        // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
-        // Reason: delegate.getSchema blocks thread when waiting for response
-        // however, if the netty thread is blocked, no incoming message can be processed
-        // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
-        // TODO redesign +refactor
-        processingExecutor.submit(new Runnable() {
-            @Override
-            public void run() {
-                NetconfDevice.this.rollbackSupported = rollbackSupported;
-                remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
-                deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
-                deviceContextProvider.createContextFromCapabilities(capabilities);
-                if (mountInstance != null && getSchemaContext().isPresent()) {
-                    mountInstance.setSchemaContext(getSchemaContext().get());
-                }
+    private final RemoteDeviceId id;
 
-                updateDeviceState(true, capabilities);
+    private final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade;
+    private final ListeningExecutorService processingExecutor;
+    private final MessageTransformer<NetconfMessage> messageTransformer;
+    private final SchemaContextProviderFactory schemaContextProviderFactory;
+    private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
 
-                if (mountInstance != null) {
-                    confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
-                    operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
-                    commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
+    public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
+            final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
+            final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
 
-                    List<RpcRegistration> rpcs = new ArrayList<>();
-                    // TODO same condition twice
-                    if (mountInstance != null && getSchemaContext().isPresent()) {
-                        for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
-                            rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
-                        }
+        return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(),
+                new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory<InputStream>() {
+                    @Override
+                    public SchemaSourceProvider<InputStream> createSourceProvider(final RpcImplementation deviceRpc) {
+                        return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id,
+                                deviceRpc));
                     }
-                    rpcReg = rpcs;
-                }
-            }
-        });
-    }
-
-    private void updateDeviceState(boolean up, Set<QName> capabilities) {
-        checkDataStoreState();
-
-        DataModificationTransaction transaction = dataBroker.beginTransaction();
-
-        CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
-        it.setQName(INVENTORY_NODE);
-        it.addLeaf(INVENTORY_ID, name);
-        it.addLeaf(INVENTORY_CONNECTED, up);
-
-        logger.debug("Client capabilities {}", capabilities);
-        for (QName capability : capabilities) {
-            it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString());
-        }
-
-        logger.debug("Update device state transaction " + transaction.getIdentifier()
-                + " putting operational data started.");
-        transaction.removeOperationalData(path);
-        transaction.putOperationalData(path, it.toInstance());
-        logger.debug("Update device state transaction " + transaction.getIdentifier()
-                + " putting operational data ended.");
-
-        // FIXME: this has to be asynchronous
-        RpcResult<TransactionStatus> transactionStatus = null;
-        try {
-            transactionStatus = transaction.commit().get();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for response", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Read configuration data " + path + " failed", e);
-        }
-        // TODO better ex handling
-
-        if (transactionStatus.isSuccessful()) {
-            logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
-        } else {
-            logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
-            logger.debug("Update device state transaction status " + transaction.getStatus());
-        }
-    }
-
-    @Override
-    public CompositeNode readConfigurationData(InstanceIdentifier path) {
-        RpcResult<CompositeNode> result = null;
-        try {
-            result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
-                    wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for response", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Read configuration data " + path + " failed", e);
-        }
-
-        CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
-        return data == null ? null : (CompositeNode) findNode(data, path);
-    }
-
-    @Override
-    public CompositeNode readOperationalData(InstanceIdentifier path) {
-        RpcResult<CompositeNode> result = null;
-        try {
-            result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for response", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Read configuration data " + path + " failed", e);
-        }
-
-        CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
-        return (CompositeNode) findNode(data, path);
-    }
-
-    @Override
-    public Set<QName> getSupportedRpcs() {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
-        return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
+                });
     }
 
-    @Override
-    public Collection<ProviderFunctionality> getProviderFunctionality() {
-        return Collections.emptySet();
+    @VisibleForTesting
+    protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
+            final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
+            final SchemaContextProviderFactory schemaContextProviderFactory,
+            final SchemaSourceProviderFactory<InputStream> sourceProviderFactory) {
+        this.id = id;
+        this.messageTransformer = messageTransformer;
+        this.salFacade = salFacade;
+        this.sourceProviderFactory = sourceProviderFactory;
+        this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
+        this.schemaContextProviderFactory = schemaContextProviderFactory;
     }
 
     @Override
-    public void onSessionInitiated(ProviderSession session) {
-        dataBroker = session.getService(DataBrokerService.class);
-
-        processingExecutor.submit(new Runnable() {
+    public void onRemoteSessionUp(final NetconfSessionCapabilities remoteSessionCapabilities,
+                                  final RemoteDeviceCommunicator<NetconfMessage> listener) {
+        // SchemaContext setup has to be performed in a dedicated thread since
+        // we are in a netty thread in this method
+        // Yang models are being downloaded in this method and it would cause a
+        // deadlock if we used the netty thread
+        // http://netty.io/wiki/thread-model.html
+        logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
+
+        final ListenableFuture<?> salInitializationFuture = processingExecutor.submit(new Runnable() {
             @Override
             public void run() {
-                updateInitialState();
+                final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener);
+                final SchemaSourceProvider<InputStream> delegate = sourceProviderFactory.createSourceProvider(deviceRpc);
+                final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
+                updateMessageTransformer(schemaContextProvider);
+                salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
             }
         });
 
-        mountService = session.getService(MountProvisionService.class);
-        if (mountService != null) {
-            mountInstance = mountService.createOrGetMountPoint(path);
-        }
-    }
-
-    private void updateInitialState() {
-        checkDataStoreState();
-
-        DataModificationTransaction transaction = dataBroker.beginTransaction();
-        if (operationalNodeNotExisting(transaction)) {
-            transaction.putOperationalData(path, getNodeWithId());
-        }
-        if (configurationNodeNotExisting(transaction)) {
-            transaction.putConfigurationData(path, getNodeWithId());
-        }
-
-        try {
-            transaction.commit().get();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for response", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Read configuration data " + path + " failed", e);
-        }
-    }
-
-    private void checkDataStoreState() {
-        // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore
-        dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder(
-                Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build());    }
-
-    CompositeNode getNodeWithId() {
-        SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
-        return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
-    }
-
-    boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
-        return null == transaction.readConfigurationData(path);
-    }
-
-    boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
-        return null == transaction.readOperationalData(path);
-    }
-
-    static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
-
-        Node<?> current = node;
-        for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
-            if (current instanceof SimpleNode<?>) {
-                return null;
-            } else if (current instanceof CompositeNode) {
-                CompositeNode currentComposite = (CompositeNode) current;
-
-                current = currentComposite.getFirstCompositeByName(arg.getNodeType());
-                if (current == null) {
-                    current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
-                }
-                if (current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.getNodeType());
-                }
-                if (current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
-                }
-                if (current == null) {
-                    return null;
-                }
-            }
-        }
-        return current;
-    }
-
-    @Override
-    public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
-            DataModification<InstanceIdentifier, CompositeNode> modification) {
-        NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
-                modification, true, rollbackSupported);
-        try {
-            twoPhaseCommit.prepare();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while waiting for response", e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Read configuration data " + path + " failed", e);
-        }
-         return twoPhaseCommit;
-    }
-
-    Set<QName> getCapabilities(Collection<String> capabilities) {
-        return FluentIterable.from(capabilities).filter(new Predicate<String>() {
+        Futures.addCallback(salInitializationFuture, new FutureCallback<Object>() {
             @Override
-            public boolean apply(final String capability) {
-                return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
+            public void onSuccess(final Object result) {
+                logger.debug("{}: Initialization in sal successful", id);
+                logger.info("{}: Netconf connector initialized successfully", id);
             }
-        }).transform(new Function<String, QName>() {
-            @Override
-            public QName apply(final String capability) {
-                String[] parts = capability.split("\\?");
-                String namespace = parts[0];
-                FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
-
-                String revision = getStringAndTransform(queryParams, "revision=", "revision=");
 
-                String moduleName = getStringAndTransform(queryParams, "module=", "module=");
-
-                if (revision == null) {
-                    logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
-                    revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
-
-                    if (revision != null) {
-                        logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
-                    }
-                }
-                if (revision == null) {
-                    return QName.create(URI.create(namespace), null, moduleName);
-                }
-                return QName.create(namespace, revision, moduleName);
-            }
-
-            private String getStringAndTransform(final Iterable<String> queryParams, final String match,
-                    final String substringToRemove) {
-                Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
-                    @Override
-                    public boolean apply(final String input) {
-                        return input.startsWith(match);
-                    }
-                });
-
-                return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
+            @Override
+            public void onFailure(final Throwable t) {
+                // Unable to initialize device, set as disconnected
+                logger.error("{}: Initialization failed", id, t);
+                salFacade.onDeviceDisconnected();
             }
-
-        }).toSet();
-    }
-
-    @Override
-    public void close() {
-        bringDown();
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public InetSocketAddress getSocketAddress() {
-        return socketAddress;
-    }
-
-    public MountProvisionInstance getMountInstance() {
-        return mountInstance;
-    }
-
-    public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
-        this.reconnectStrategy = reconnectStrategy;
-    }
-
-    public void setProcessingExecutor(final ExecutorService processingExecutor) {
-        this.processingExecutor = processingExecutor;
-    }
-
-    public void setSocketAddress(final InetSocketAddress socketAddress) {
-        this.socketAddress = socketAddress;
-    }
-
-    public void setEventExecutor(final EventExecutor eventExecutor) {
-        this.eventExecutor = eventExecutor;
-    }
-
-    public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
-        this.schemaSourceProvider = schemaSourceProvider;
-    }
-
-    public void setDispatcher(final NetconfClientDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
+        });
     }
 
-    public void setClientConfig(final NetconfReconnectingClientConfiguration clientConfig) {
-        this.clientConfig = clientConfig;
+    /**
+     * Update initial message transformer to use retrieved schema
+     */
+    private void updateMessageTransformer(final SchemaContextProvider schemaContextProvider) {
+        messageTransformer.onGlobalContextUpdated(schemaContextProvider.getSchemaContext());
     }
 
-    public void setDataProviderService(final DataProviderService dataProviderService) {
-        this.dataProviderService = dataProviderService;
+    private SchemaContextProvider setUpSchemaContext(final SchemaSourceProvider<InputStream> sourceProvider, final NetconfSessionCapabilities capabilities) {
+        return schemaContextProviderFactory.createContextProvider(capabilities.getModuleBasedCaps(), sourceProvider);
     }
-}
-
-class NetconfDeviceSchemaContextProvider {
-
-    NetconfDevice device;
-
-    SchemaSourceProvider<InputStream> sourceProvider;
 
-    Optional<SchemaContext> currentContext;
-
-    NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
-        this.device = device;
-        this.sourceProvider = sourceProvider;
-        this.currentContext = Optional.absent();
+    private NetconfDeviceRpc setUpDeviceRpc(final NetconfSessionCapabilities capHolder, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+        Preconditions.checkArgument(capHolder.isMonitoringSupported(),
+                "%s: Netconf device does not support netconf monitoring, yang schemas cannot be acquired. Netconf device capabilities", capHolder);
+        return new NetconfDeviceRpc(listener, messageTransformer);
     }
 
-    void createContextFromCapabilities(Iterable<QName> capabilities) {
-        YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
-        if (!sourceContext.getMissingSources().isEmpty()) {
-            device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
-        }
-        device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
-        List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
-        if (!sourceContext.getValidSources().isEmpty()) {
-            SchemaContext schemaContext = tryToCreateContext(modelsToParse);
-            currentContext = Optional.fromNullable(schemaContext);
-        } else {
-            currentContext = Optional.absent();
-        }
-        if (currentContext.isPresent()) {
-            device.logger.debug("Schema context successfully created.");
-        }
+    @Override
+    public void onRemoteSessionDown() {
+        salFacade.onDeviceDisconnected();
     }
 
-    SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
-        YangParserImpl parser = new YangParserImpl();
-        try {
-
-            Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
-            return parser.resolveSchemaContext(models);
-        } catch (Exception e) {
-            device.logger.debug("Error occured during parsing YANG schemas", e);
-            return null;
-        }
+    @Override
+    public void onNotification(final NetconfMessage notification) {
+        final CompositeNode parsedNotification = messageTransformer.toNotification(notification);
+        salFacade.onNotification(parsedNotification);
     }
 }
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java
deleted file mode 100644 (file)
index 68667f0..0000000
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf;
-
-import com.google.common.collect.Sets;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-
-import java.util.ArrayDeque;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.Set;
-
-import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
-import org.opendaylight.controller.netconf.client.NetconfClientSession;
-import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
-import org.opendaylight.controller.netconf.util.xml.XmlElement;
-import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-public class NetconfDeviceListener implements NetconfClientSessionListener {
-    private static final class Request {
-        final UncancellableFuture<RpcResult<CompositeNode>> future;
-        final NetconfMessage request;
-        final QName rpc;
-
-        private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request, final QName rpc) {
-            this.future = future;
-            this.request = request;
-            this.rpc = rpc;
-        }
-    }
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class);
-    private final Queue<Request> requests = new ArrayDeque<>();
-    private final NetconfDevice device;
-    private NetconfClientSession session;
-
-    public NetconfDeviceListener(final NetconfDevice device) {
-        this.device = Preconditions.checkNotNull(device);
-    }
-
-    @Override
-    public synchronized void onSessionUp(final NetconfClientSession session) {
-        LOG.debug("Session with {} established as address {} session-id {}",
-                device.getName(), device.getSocketAddress(), session.getSessionId());
-
-        this.session = session;
-
-        final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
-        LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
-
-        // Select the appropriate provider
-        final SchemaSourceProvider<String> delegate;
-        if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
-            delegate = new NetconfRemoteSchemaSourceProvider(device);
-            // FIXME caps do not contain urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring, since it is filtered out in getCapabilitites
-        } else if(session.getServerCapabilities().contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
-            delegate = new NetconfRemoteSchemaSourceProvider(device);
-        } else {
-            LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
-            delegate = SchemaSourceProviders.noopProvider();
-        }
-
-        device.bringUp(delegate, caps, isRollbackSupported(session.getServerCapabilities()));
-
-    }
-
-    private static boolean isRollbackSupported(final Collection<String> serverCapabilities) {
-        // TODO rollback capability cannot be searched for in Set<QName> caps
-        // since this set does not contain module-less capabilities
-        return Sets.newHashSet(serverCapabilities).contains(NetconfMapping.NETCONF_ROLLBACK_ON_ERROR_URI.toString());
-    }
-
-    private synchronized void tearDown(final Exception e) {
-        session = null;
-
-        /*
-         * Walk all requests, check if they have been executing
-         * or cancelled and remove them from the queue.
-         */
-        final Iterator<Request> it = requests.iterator();
-        while (it.hasNext()) {
-            final Request r = it.next();
-            if (r.future.isUncancellable()) {
-                // FIXME: add a RpcResult instead?
-                r.future.setException(e);
-                it.remove();
-            } else if (r.future.isCancelled()) {
-                // This just does some house-cleaning
-                it.remove();
-            }
-        }
-
-        device.bringDown();
-    }
-
-    @Override
-    public void onSessionDown(final NetconfClientSession session, final Exception e) {
-        LOG.debug("Session with {} went down", device.getName(), e);
-        tearDown(e);
-    }
-
-    @Override
-    public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
-        LOG.debug("Session with {} terminated {}", session, reason);
-        tearDown(new RuntimeException(reason.getErrorMessage()));
-    }
-
-    @Override
-    public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
-        /*
-         * Dispatch between notifications and messages. Messages need to be processed
-         * with lock held, notifications do not.
-         */
-        if (isNotification(message)) {
-            processNotification(message);
-        } else {
-            processMessage(message);
-        }
-    }
-
-    private synchronized void processMessage(final NetconfMessage message) {
-        final Request r = requests.peek();
-        if (r.future.isUncancellable()) {
-            requests.poll();
-            LOG.debug("Matched {} to {}", r.request, message);
-
-            try {
-                NetconfMapping.checkValidReply(r.request, message);
-            } catch (IllegalStateException e) {
-                LOG.warn("Invalid request-reply match, reply message contains different message-id", e);
-                r.future.setException(e);
-                return;
-            }
-
-            try {
-                NetconfMapping.checkSuccessReply(message);
-            } catch (NetconfDocumentedException | IllegalStateException e) {
-                LOG.warn("Error reply from remote device", e);
-                r.future.setException(e);
-                return;
-            }
-
-            r.future.set(NetconfMapping.toRpcResult(message, r.rpc, device.getSchemaContext()));
-        } else {
-            LOG.warn("Ignoring unsolicited message", message);
-        }
-    }
-
-    synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message, final QName rpc) {
-        if (session == null) {
-            LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
-            return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
-                @Override
-                public boolean isSuccessful() {
-                    return false;
-                }
-
-                @Override
-                public CompositeNode getResult() {
-                    return null;
-                }
-
-                @Override
-                public Collection<RpcError> getErrors() {
-                    // FIXME: indicate that the session is down
-                    return Collections.emptySet();
-                }
-            });
-        }
-
-        final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message, rpc);
-        requests.add(req);
-
-        session.sendMessage(req.request).addListener(new FutureListener<Void>() {
-            @Override
-            public void operationComplete(final Future<Void> future) throws Exception {
-                if (!future.isSuccess()) {
-                    // We expect that a session down will occur at this point
-                    LOG.debug("Failed to send request {}", XmlUtil.toString(req.request.getDocument()), future.cause());
-                    req.future.setException(future.cause());
-                } else {
-                    LOG.trace("Finished sending request {}", req.request);
-                }
-            }
-        });
-
-        return req.future;
-    }
-
-    /**
-     * Process an incoming notification.
-     *
-     * @param notification Notification message
-     */
-    private void processNotification(final NetconfMessage notification) {
-        this.device.logger.debug("Received NETCONF notification.", notification);
-        CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext());
-        if (domNotification == null) {
-            return;
-        }
-
-        MountProvisionInstance mountInstance =  this.device.getMountInstance();
-        if (mountInstance != null) {
-            mountInstance.publish(domNotification);
-        }
-    }
-
-    private static boolean isNotification(final NetconfMessage message) {
-        final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
-        return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
-    }
-}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java
deleted file mode 100644 (file)
index 34cd9aa..0000000
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf;
-
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_CANDIDATE_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_COMMIT_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_CONFIG_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_EDIT_CONFIG_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_ERROR_OPTION_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_OPERATION_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_RUNNING_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_TARGET_QNAME;
-import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.ROLLBACK_ON_ERROR_OPTION;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.DataModification;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTwoPhaseCommitTransaction.class);
-    private final DataModification<InstanceIdentifier, CompositeNode> modification;
-    private final NetconfDevice device;
-    private final boolean candidateSupported;
-    private final boolean rollbackSupported;
-
-    public NetconfDeviceTwoPhaseCommitTransaction(final NetconfDevice device,
-            final DataModification<InstanceIdentifier, CompositeNode> modification,
-            final boolean candidateSupported, final boolean rollbackOnErrorSupported) {
-        this.device = Preconditions.checkNotNull(device);
-        this.modification = Preconditions.checkNotNull(modification);
-        this.candidateSupported = candidateSupported;
-        this.rollbackSupported = rollbackOnErrorSupported;
-    }
-
-    void prepare() throws InterruptedException, ExecutionException {
-        for (InstanceIdentifier toRemove : modification.getRemovedConfigurationData()) {
-            sendDelete(toRemove);
-        }
-        for(Entry<InstanceIdentifier, CompositeNode> toUpdate : modification.getUpdatedConfigurationData().entrySet()) {
-            sendMerge(toUpdate.getKey(),toUpdate.getValue());
-        }
-    }
-
-    private void sendMerge(final InstanceIdentifier key, final CompositeNode value) throws InterruptedException, ExecutionException {
-        sendEditRpc(createEditStructure(key, Optional.<String>absent(), Optional.of(value)));
-    }
-
-    private void sendDelete(final InstanceIdentifier toDelete) throws InterruptedException, ExecutionException {
-        sendEditRpc(createEditStructure(toDelete, Optional.of("delete"), Optional.<CompositeNode> absent()));
-    }
-
-    private void sendEditRpc(final CompositeNode editStructure) throws InterruptedException, ExecutionException {
-        CompositeNodeBuilder<ImmutableCompositeNode> builder = configurationRpcBuilder();
-        builder.setQName(NETCONF_EDIT_CONFIG_QNAME);
-        builder.add(editStructure);
-
-        RpcResult<CompositeNode> rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance()).get();
-        Preconditions.checkState(rpcResult.isSuccessful(),"Rpc Result was unsuccessful");
-    }
-
-    private CompositeNodeBuilder<ImmutableCompositeNode> configurationRpcBuilder() {
-        CompositeNodeBuilder<ImmutableCompositeNode> ret = ImmutableCompositeNode.builder();
-
-        Node<?> targetNode;
-        if(candidateSupported) {
-            targetNode = ImmutableCompositeNode.create(NETCONF_CANDIDATE_QNAME, ImmutableList.<Node<?>>of());
-        } else {
-            targetNode = ImmutableCompositeNode.create(NETCONF_RUNNING_QNAME, ImmutableList.<Node<?>>of());
-        }
-
-        Node<?> targetWrapperNode = ImmutableCompositeNode.create(NETCONF_TARGET_QNAME, ImmutableList.<Node<?>>of(targetNode));
-
-        if(rollbackSupported) {
-            LOG.debug("Rollback-on-error supported, setting {} to {}", NETCONF_ERROR_OPTION_QNAME, ROLLBACK_ON_ERROR_OPTION);
-            ret.addLeaf(NETCONF_ERROR_OPTION_QNAME, ROLLBACK_ON_ERROR_OPTION);
-        }
-
-        ret.add(targetWrapperNode);
-        return ret;
-    }
-
-    private CompositeNode createEditStructure(final InstanceIdentifier dataPath, final Optional<String> operation,
-            final Optional<CompositeNode> lastChildOverride) {
-        List<PathArgument> path = dataPath.getPath();
-        List<PathArgument> reversed = Lists.reverse(path);
-        CompositeNode previous = null;
-        boolean isLast = true;
-        for (PathArgument arg : reversed) {
-            CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
-            builder.setQName(arg.getNodeType());
-            Map<QName, Object> predicates = Collections.emptyMap();
-            if (arg instanceof NodeIdentifierWithPredicates) {
-                predicates = ((NodeIdentifierWithPredicates) arg).getKeyValues();
-            }
-            for (Entry<QName, Object> entry : predicates.entrySet()) {
-                builder.addLeaf(entry.getKey(), entry.getValue());
-            }
-
-            if (isLast) {
-                if (operation.isPresent()) {
-                    builder.setAttribute(NETCONF_OPERATION_QNAME, operation.get());
-                }
-                if (lastChildOverride.isPresent()) {
-                    List<Node<?>> children = lastChildOverride.get().getValue();
-                    for(Node<?> child : children) {
-                        if(!predicates.containsKey(child.getKey())) {
-                            builder.add(child);
-                        }
-                    }
-
-                }
-            } else {
-                builder.add(previous);
-            }
-            previous = builder.toInstance();
-            isLast = false;
-        }
-        return ImmutableCompositeNode.create(NETCONF_CONFIG_QNAME, ImmutableList.<Node<?>>of(previous));
-    }
-
-    @Override
-    public RpcResult<Void> finish() {
-        CompositeNodeBuilder<ImmutableCompositeNode> commitInput = ImmutableCompositeNode.builder();
-        commitInput.setQName(NETCONF_COMMIT_QNAME);
-        try {
-            final RpcResult<?> rpcResult = device.invokeRpc(NetconfMapping.NETCONF_COMMIT_QNAME, commitInput.toInstance()).get();
-            return new RpcResult<Void>() {
-
-                @Override
-                public boolean isSuccessful() {
-                    return rpcResult.isSuccessful();
-                }
-
-                @Override
-                public Void getResult() {
-                    return null;
-                }
-
-                @Override
-                public Collection<RpcError> getErrors() {
-                    return rpcResult.getErrors();
-                }
-            };
-        } catch (final InterruptedException | ExecutionException e) {
-            LOG.warn("Failed to finish operation", e);
-            return new RpcResult<Void>() {
-                @Override
-                public boolean isSuccessful() {
-                    return false;
-                }
-
-                @Override
-                public Void getResult() {
-                    return null;
-                }
-
-                @Override
-                public Collection<RpcError> getErrors() {
-                    // FIXME: wrap the exception
-                    return Collections.emptySet();
-                }
-            };
-        }
-    }
-
-    @Override
-    public DataModification<InstanceIdentifier, CompositeNode> getModification() {
-        return this.modification;
-    }
-
-    @Override
-    public RpcResult<Void> rollback() throws IllegalStateException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfInventoryUtils.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfInventoryUtils.java
deleted file mode 100644 (file)
index b68f18f..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf;
-
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-
-public class NetconfInventoryUtils {
-    public static final QName NETCONF_MOUNT = null;
-    public static final QName NETCONF_ENDPOINT = null;
-    public static final QName NETCONF_ENDPOINT_ADDRESS = null;
-    public static final QName NETCONF_ENDPOINT_PORT = null;
-
-    private NetconfInventoryUtils() {
-        throw new UnsupportedOperationException("Utility class cannot be instantiated");
-    }
-
-    public static String getEndpointAddress(CompositeNode node) {
-        return node.getCompositesByName(NETCONF_ENDPOINT).get(0).getFirstSimpleByName(NETCONF_ENDPOINT_ADDRESS).getValue().toString();
-    }
-
-    public static String getEndpointPort(CompositeNode node) {
-        return node.getCompositesByName(NETCONF_ENDPOINT).get(0).getFirstSimpleByName(NETCONF_ENDPOINT_PORT).getValue().toString();
-    }
-}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java
deleted file mode 100644 (file)
index 31c6bd0..0000000
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connect.netconf;
-
-import java.util.Collection;
-import java.util.concurrent.ExecutionException;
-
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider<String> {
-
-    public static final QName IETF_NETCONF_MONITORING = QName.create(
-            "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring", "2010-10-04", "ietf-netconf-monitoring");
-    public static final QName GET_SCHEMA_QNAME = QName.create(IETF_NETCONF_MONITORING, "get-schema");
-    public static final QName GET_DATA_QNAME = QName.create(IETF_NETCONF_MONITORING, "data");
-
-    private final NetconfDevice device;
-
-    private final Logger logger;
-
-    public NetconfRemoteSchemaSourceProvider(NetconfDevice device) {
-        this.device = Preconditions.checkNotNull(device);
-        logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + device.getName());
-    }
-
-    @Override
-    public Optional<String> getSchemaSource(String moduleName, Optional<String> revision) {
-        CompositeNodeBuilder<ImmutableCompositeNode> request = ImmutableCompositeNode.builder(); //
-        request.setQName(GET_SCHEMA_QNAME) //
-                .addLeaf("format", "yang") //
-                .addLeaf("identifier", moduleName); //
-        if (revision.isPresent()) {
-            request.addLeaf("version", revision.get());
-        }
-
-        logger.trace("Loading YANG schema source for {}:{}", moduleName, revision);
-        try {
-            RpcResult<CompositeNode> schemaReply = device.invokeRpc(GET_SCHEMA_QNAME, request.toInstance()).get();
-            if (schemaReply.isSuccessful()) {
-                String schemaBody = getSchemaFromRpc(schemaReply.getResult());
-                if (schemaBody != null) {
-                    device.logger.trace("YANG Schema successfully retrieved from remote for {}:{}", moduleName, revision);
-                    return Optional.of(schemaBody);
-                }
-            }
-            logger.warn("YANG shcema was not successfully retrieved. Errors: {}", schemaReply.getErrors());
-        } catch (InterruptedException | ExecutionException e) {
-            logger.warn("YANG shcema was not successfully retrieved.", e);
-        }
-        return Optional.absent();
-    }
-
-    private String getSchemaFromRpc(CompositeNode result) {
-        if (result == null) {
-            return null;
-        }
-        SimpleNode<?> simpleNode = result.getFirstSimpleByName(GET_DATA_QNAME.withoutRevision());
-        Object potential = simpleNode.getValue();
-        if (potential instanceof String) {
-            return (String) potential;
-        }
-        return null;
-    }
-
-    public static final boolean isSupportedFor(Collection<QName> capabilities) {
-        return capabilities.contains(IETF_NETCONF_MONITORING);
-    }
-}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java
new file mode 100644 (file)
index 0000000..fa6e252
--- /dev/null
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.listener;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Queue;
+
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.connect.util.FailedRpcResult;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+
+public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator<NetconfMessage> {
+
+    private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
+
+    private static final RpcResult<NetconfMessage> FAILED_RPC_RESULT = new FailedRpcResult<>(RpcErrors.getRpcError(
+            null, null, null, RpcError.ErrorSeverity.ERROR, "Netconf session disconnected",
+            RpcError.ErrorType.PROTOCOL, null));
+
+    private final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice;
+    private final RemoteDeviceId id;
+
+    public NetconfDeviceCommunicator(final RemoteDeviceId id,
+            final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice) {
+        this.id = id;
+        this.remoteDevice = remoteDevice;
+    }
+
+    private final Queue<Request> requests = new ArrayDeque<>();
+    private NetconfClientSession session;
+
+    @Override
+    public synchronized void onSessionUp(final NetconfClientSession session) {
+        logger.debug("{}: Session established", id);
+        this.session = session;
+
+        final NetconfSessionCapabilities netconfSessionCapabilities = NetconfSessionCapabilities.fromNetconfSession(session);
+        logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionCapabilities);
+
+        remoteDevice.onRemoteSessionUp(netconfSessionCapabilities, this);
+    }
+
+    public void initializeRemoteConnection(final NetconfClientDispatcher dispatch,
+                                           final NetconfReconnectingClientConfiguration config) {
+        dispatch.createReconnectingClient(config);
+    }
+
+    private synchronized void tearDown(final Exception e) {
+        remoteDevice.onRemoteSessionDown();
+        session = null;
+
+        /*
+         * Walk all requests, check if they have been executing
+         * or cancelled and remove them from the queue.
+         */
+        final Iterator<Request> it = requests.iterator();
+        while (it.hasNext()) {
+            final Request r = it.next();
+            if (r.future.isUncancellable()) {
+                r.future.setException(e);
+                it.remove();
+            } else if (r.future.isCancelled()) {
+                // This just does some house-cleaning
+                it.remove();
+            }
+        }
+    }
+
+    @Override
+    public void onSessionDown(final NetconfClientSession session, final Exception e) {
+        logger.warn("{}: Session went down", id, e);
+        tearDown(e);
+    }
+
+    @Override
+    public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
+        logger.warn("{}: Session terminated {}", id, reason);
+        tearDown(new RuntimeException(reason.getErrorMessage()));
+    }
+
+    @Override
+    public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
+        /*
+         * Dispatch between notifications and messages. Messages need to be processed
+         * with lock held, notifications do not.
+         */
+        if (isNotification(message)) {
+            processNotification(message);
+        } else {
+            processMessage(message);
+        }
+    }
+
+    private synchronized void processMessage(final NetconfMessage message) {
+        final Request r = requests.peek();
+        if (r.future.isUncancellable()) {
+            requests.poll();
+
+            logger.debug("{}: Message received {}", id, message);
+
+            if(logger.isTraceEnabled()) {
+                logger.trace("{}: Matched request: {} to response: {}", id, msgToS(r.request), msgToS(message));
+            }
+
+            try {
+                NetconfMessageTransformUtil.checkValidReply(r.request, message);
+            } catch (final IllegalStateException e) {
+                logger.warn("{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}", id,
+                        msgToS(r.request), msgToS(message), e);
+                r.future.setException(e);
+                return;
+            }
+
+            try {
+                NetconfMessageTransformUtil.checkSuccessReply(message);
+            } catch (NetconfDocumentedException | IllegalStateException e) {
+                logger.warn("{}: Error reply from remote device, request: {}, response: {}", id,
+                        msgToS(r.request), msgToS(message), e);
+                r.future.setException(e);
+                return;
+            }
+
+            r.future.set(Rpcs.getRpcResult(true, message, Collections.<RpcError>emptySet()));
+        } else {
+            logger.warn("{}: Ignoring unsolicited message {}", id, msgToS(message));
+        }
+    }
+
+    @Override
+    public void close() {
+        tearDown(new RuntimeException("Closed"));
+    }
+
+    private static String msgToS(final NetconfMessage msg) {
+        return XmlUtil.toString(msg.getDocument());
+    }
+
+    @Override
+    public synchronized ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
+        if(logger.isTraceEnabled()) {
+            logger.trace("{}: Sending message {}", id, msgToS(message));
+        }
+
+        if (session == null) {
+            logger.warn("{}: Session is disconnected, failing RPC request {}", id, message);
+            return Futures.immediateFuture(FAILED_RPC_RESULT);
+        }
+
+        final Request req = new Request(new UncancellableFuture<RpcResult<NetconfMessage>>(true), message, rpc);
+        requests.add(req);
+
+        session.sendMessage(req.request).addListener(new FutureListener<Void>() {
+            @Override
+            public void operationComplete(final Future<Void> future) throws Exception {
+                if (!future.isSuccess()) {
+                    // We expect that a session down will occur at this point
+                    logger.debug("{}: Failed to send request {}", id, XmlUtil.toString(req.request.getDocument()), future.cause());
+                    req.future.setException(future.cause());
+                } else {
+                    logger.trace("{}: Finished sending request {}", id, req.request);
+                }
+            }
+        });
+
+        return req.future;
+    }
+
+    private void processNotification(final NetconfMessage notification) {
+        logger.debug("{}: Notification received: {}", id, notification);
+
+        if(logger.isTraceEnabled()) {
+            logger.trace("{}: Notification received: {}", id, msgToS(notification));
+        }
+
+        remoteDevice.onNotification(notification);
+    }
+
+    private static boolean isNotification(final NetconfMessage message) {
+        final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
+        return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
+    }
+
+    private static final class Request {
+        final UncancellableFuture<RpcResult<NetconfMessage>> future;
+        final NetconfMessage request;
+        final QName rpc;
+
+        private Request(final UncancellableFuture<RpcResult<NetconfMessage>> future, final NetconfMessage request, final QName rpc) {
+            this.future = future;
+            this.request = request;
+            this.rpc = rpc;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilities.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilities.java
new file mode 100644 (file)
index 0000000..82903ea
--- /dev/null
@@ -0,0 +1,111 @@
+package org.opendaylight.controller.sal.connect.netconf.listener;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+
+import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public final class NetconfSessionCapabilities {
+
+    private static final Logger logger = LoggerFactory.getLogger(NetconfSessionCapabilities.class);
+
+    private final Set<String> capabilities;
+
+    private final Set<QName> moduleBasedCaps;
+
+    private NetconfSessionCapabilities(final Set<String> capabilities, final Set<QName> moduleBasedCaps) {
+        this.capabilities = capabilities;
+        this.moduleBasedCaps = moduleBasedCaps;
+    }
+
+    public Set<QName> getModuleBasedCaps() {
+        return moduleBasedCaps;
+    }
+
+    public boolean containsCapability(final String capability) {
+        return capabilities.contains(capability);
+    }
+
+    public boolean containsCapability(final QName capability) {
+        return moduleBasedCaps.contains(capability);
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this)
+                .add("capabilities", capabilities)
+                .add("rollback", isRollbackSupported())
+                .add("monitoring", isMonitoringSupported())
+                .toString();
+    }
+
+    public boolean isRollbackSupported() {
+        return containsCapability(NetconfMessageTransformUtil.NETCONF_ROLLBACK_ON_ERROR_URI.toString());
+    }
+
+    public boolean isMonitoringSupported() {
+        return containsCapability(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING)
+                || containsCapability(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING.getNamespace().toString());
+    }
+
+    public static NetconfSessionCapabilities fromNetconfSession(final NetconfClientSession session) {
+        return fromStrings(session.getServerCapabilities());
+    }
+
+    public static NetconfSessionCapabilities fromStrings(final Collection<String> capabilities) {
+        final Set<QName> moduleBasedCaps = Sets.newHashSet();
+
+        for (final String capability : capabilities) {
+            if(isModuleBasedCapability(capability)) {
+                final String[] parts = capability.split("\\?");
+                final String namespace = parts[0];
+                final FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
+
+                String revision = getStringAndTransform(queryParams, "revision=", "revision=");
+
+                final String moduleName = getStringAndTransform(queryParams, "module=", "module=");
+
+                if (revision == null) {
+                    logger.debug("Netconf device was not reporting revision correctly, trying to get amp;revision=");
+                    revision = getStringAndTransform(queryParams, "amp;revision=", "amp;revision=");
+
+                    if (revision == null) {
+                        logger.warn("Netconf device returned revision incorrectly escaped for {}", capability);
+                    }
+                }
+                moduleBasedCaps.add(QName.create(namespace, revision, moduleName));
+            }
+        }
+
+        return new NetconfSessionCapabilities(Sets.newHashSet(capabilities), moduleBasedCaps);
+    }
+
+    private static boolean isModuleBasedCapability(final String capability) {
+        return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
+    }
+
+    private static String getStringAndTransform(final Iterable<String> queryParams, final String match,
+                                                final String substringToRemove) {
+        final Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
+            @Override
+            public boolean apply(final String input) {
+                return input.startsWith(match);
+            }
+        });
+
+        return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
+    }
+
+}
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.sal.connect.netconf;
+package org.opendaylight.controller.sal.connect.netconf.listener;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -17,7 +17,7 @@ final class UncancellableFuture<V> extends AbstractFuture<V> {
     @GuardedBy("this")
     private boolean uncancellable = false;
 
-    public UncancellableFuture(boolean uncancellable) {
+    public UncancellableFuture(final boolean uncancellable) {
         this.uncancellable = uncancellable;
     }
 
@@ -35,23 +35,19 @@ final class UncancellableFuture<V> extends AbstractFuture<V> {
     }
 
     @Override
-    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
-        if (uncancellable) {
-            return false;
-        }
-
-        return super.cancel(mayInterruptIfRunning);
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        return uncancellable ? false : super.cancel(mayInterruptIfRunning);
     }
 
     @Override
-    public synchronized boolean set(@Nullable V value) {
-        Preconditions.checkState(uncancellable == true);
+    public synchronized boolean set(@Nullable final V value) {
+        Preconditions.checkState(uncancellable);
         return super.set(value);
     }
 
     @Override
-    protected boolean setException(Throwable throwable) {
-        Preconditions.checkState(uncancellable == true);
+    protected boolean setException(final Throwable throwable) {
+        Preconditions.checkState(uncancellable);
         return super.setException(throwable);
     }
 }
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/package-info.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/package-info.java
new file mode 100644 (file)
index 0000000..4a95582
--- /dev/null
@@ -0,0 +1,4 @@
+/**
+ * Implementation of netconf southbound connector
+ */
+package org.opendaylight.controller.sal.connect.netconf;
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceCommitHandler.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceCommitHandler.java
new file mode 100644 (file)
index 0000000..457b8c3
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.connect.util.FailedRpcResult;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NetconfDeviceCommitHandler implements DataCommitHandler<InstanceIdentifier,CompositeNode> {
+
+    private static final Logger logger= LoggerFactory.getLogger(NetconfDeviceCommitHandler.class);
+
+    private final RemoteDeviceId id;
+    private final RpcImplementation rpc;
+    private final boolean rollbackSupported;
+
+    public NetconfDeviceCommitHandler(final RemoteDeviceId id, final RpcImplementation rpc, final boolean rollbackSupported) {
+        this.id = id;
+        this.rpc = rpc;
+        this.rollbackSupported = rollbackSupported;
+    }
+
+    @Override
+    public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
+            final DataModification<InstanceIdentifier, CompositeNode> modification) {
+
+        final NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(id, rpc,
+                modification, true, rollbackSupported);
+        try {
+            twoPhaseCommit.prepare();
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(id + ": Interrupted while waiting for response", e);
+        } catch (final ExecutionException e) {
+            logger.warn("%s: Error executing pre commit operation on remote device", id, e);
+            return new FailingTransaction(twoPhaseCommit, e);
+        }
+
+        return twoPhaseCommit;
+    }
+
+    /**
+     * Always fail commit transaction that rolls back delegate transaction afterwards
+     */
+    private class FailingTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
+        private final NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit;
+        private final ExecutionException e;
+
+        public FailingTransaction(final NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit, final ExecutionException e) {
+            this.twoPhaseCommit = twoPhaseCommit;
+            this.e = e;
+        }
+
+        @Override
+        public DataModification<InstanceIdentifier, CompositeNode> getModification() {
+            return twoPhaseCommit.getModification();
+        }
+
+        @Override
+        public RpcResult<Void> finish() throws IllegalStateException {
+            return new FailedRpcResult<>(RpcErrors.getRpcError(null, null, null, RpcError.ErrorSeverity.ERROR,
+                    id + ": Unexpected operation error during pre-commit operations", RpcError.ErrorType.APPLICATION, e));
+        }
+
+        @Override
+        public RpcResult<Void> rollback() throws IllegalStateException {
+            return twoPhaseCommit.rollback();
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDataReader.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDataReader.java
new file mode 100644 (file)
index 0000000..2909bac
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.CONFIG_SOURCE_RUNNING;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_DATA_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.toFilterStructure;
+
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+
+public final class NetconfDeviceDataReader implements DataReader<InstanceIdentifier,CompositeNode> {
+
+    private final RpcImplementation rpc;
+    private final RemoteDeviceId id;
+
+    public NetconfDeviceDataReader(final RemoteDeviceId id, final RpcImplementation rpc) {
+        this.id = id;
+        this.rpc = rpc;
+    }
+
+    @Override
+    public CompositeNode readConfigurationData(final InstanceIdentifier path) {
+        final RpcResult<CompositeNode> result;
+        try {
+            result = rpc.invokeRpc(NETCONF_GET_CONFIG_QNAME,
+                    NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
+        } catch (final InterruptedException e) {
+            throw onInterruptedException(e);
+        } catch (final ExecutionException e) {
+            throw new RuntimeException(id + ": Read configuration data " + path + " failed", e);
+        }
+
+        final CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
+        return data == null ? null : (CompositeNode) findNode(data, path);
+    }
+
+    private RuntimeException onInterruptedException(final InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return new RuntimeException(id + ": Interrupted while waiting for response", e);
+    }
+
+    @Override
+    public CompositeNode readOperationalData(final InstanceIdentifier path) {
+        final RpcResult<CompositeNode> result;
+        try {
+            result = rpc.invokeRpc(NETCONF_GET_QNAME, NetconfMessageTransformUtil.wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
+        } catch (final InterruptedException e) {
+            throw onInterruptedException(e);
+        } catch (final ExecutionException e) {
+            throw new RuntimeException(id + ": Read operational data " + path + " failed", e);
+        }
+
+        final CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
+        return (CompositeNode) findNode(data, path);
+    }
+
+    private static Node<?> findNode(final CompositeNode node, final InstanceIdentifier identifier) {
+
+        Node<?> current = node;
+        for (final InstanceIdentifier.PathArgument arg : identifier.getPath()) {
+            if (current instanceof SimpleNode<?>) {
+                return null;
+            } else if (current instanceof CompositeNode) {
+                final CompositeNode currentComposite = (CompositeNode) current;
+
+                current = currentComposite.getFirstCompositeByName(arg.getNodeType());
+                if (current == null) {
+                    current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
+                }
+                if (current == null) {
+                    current = currentComposite.getFirstSimpleByName(arg.getNodeType());
+                }
+                if (current == null) {
+                    current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
+                }
+                if (current == null) {
+                    return null;
+                }
+            }
+        }
+        return current;
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDatastoreAdapter.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDatastoreAdapter.java
new file mode 100644 (file)
index 0000000..e491496
--- /dev/null
@@ -0,0 +1,203 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNodeBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Asynchronous (Binding-aware) adapter over datastore subtree for netconf device.
+ *
+ * All data changes are submitted to an ExecutorService to avoid Thread blocking while sal is waiting for schema.
+ */
+final class NetconfDeviceDatastoreAdapter implements AutoCloseable {
+
+    private static final Logger logger  = LoggerFactory.getLogger(NetconfDeviceDatastoreAdapter.class);
+
+    private final RemoteDeviceId id;
+    private final DataProviderService dataService;
+    private final ListeningExecutorService executor;
+
+    NetconfDeviceDatastoreAdapter(final RemoteDeviceId deviceId, final DataProviderService dataService,
+            final ExecutorService executor) {
+        this.id = Preconditions.checkNotNull(deviceId);
+        this.dataService = Preconditions.checkNotNull(dataService);
+        this.executor = MoreExecutors.listeningDecorator(Preconditions.checkNotNull(executor));
+
+        // Initial data change scheduled
+        submitDataChangeToExecutor(this.executor, new Runnable() {
+            @Override
+            public void run() {
+                initDeviceData();
+            }
+        }, deviceId);
+    }
+
+    public void updateDeviceState(final boolean up, final Set<QName> capabilities) {
+        submitDataChangeToExecutor(this.executor, new Runnable() {
+            @Override
+            public void run() {
+                updateDeviceStateInternal(up, capabilities);
+            }
+        }, id);
+    }
+
+    private void updateDeviceStateInternal(final boolean up, final Set<QName> capabilities) {
+        final org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node data = buildDataForDeviceState(
+                up, capabilities, id);
+
+        final DataModificationTransaction transaction = dataService.beginTransaction();
+        logger.trace("{}: Update device state transaction {} putting operational data started.", id, transaction.getIdentifier());
+        transaction.removeOperationalData(id.getBindingPath());
+        transaction.putOperationalData(id.getBindingPath(), data);
+        logger.trace("{}: Update device state transaction {} putting operational data ended.", id, transaction.getIdentifier());
+
+        commitTransaction(transaction, "update");
+    }
+
+    private void removeDeviceConfigAndState() {
+        final DataModificationTransaction transaction = dataService.beginTransaction();
+        logger.trace("{}: Close device state transaction {} removing all data started.", id, transaction.getIdentifier());
+        transaction.removeConfigurationData(id.getBindingPath());
+        transaction.removeOperationalData(id.getBindingPath());
+        logger.trace("{}: Close device state transaction {} removing all data ended.", id, transaction.getIdentifier());
+
+        commitTransaction(transaction, "close");
+    }
+
+    private void initDeviceData() {
+        final DataModificationTransaction transaction = dataService.beginTransaction();
+
+        final InstanceIdentifier<Node> path = id.getBindingPath();
+
+        final Node nodeWithId = getNodeWithId(id);
+        if (operationalNodeNotExisting(transaction, path)) {
+            transaction.putOperationalData(path, nodeWithId);
+        }
+        if (configurationNodeNotExisting(transaction, path)) {
+            transaction.putConfigurationData(path, nodeWithId);
+        }
+
+        commitTransaction(transaction, "init");
+    }
+
+    private void commitTransaction(final DataModificationTransaction transaction, final String txType) {
+        // attempt commit
+        final RpcResult<TransactionStatus> result;
+        try {
+            result = transaction.commit().get();
+        } catch (InterruptedException | ExecutionException e) {
+            logger.error("{}: Transaction({}) failed", id, txType, e);
+            throw new IllegalStateException(id + " Transaction(" + txType + ") not committed correctly", e);
+        }
+
+        // verify success result + committed state
+        if (isUpdateSuccessful(result)) {
+            logger.trace("{}: Transaction({}) {} SUCCESSFUL", id, txType, transaction.getIdentifier());
+        } else {
+            logger.error("{}: Transaction({}) {} FAILED!", id, txType, transaction.getIdentifier());
+            throw new IllegalStateException(id + "  Transaction(" + txType + ") not committed correctly, " +
+                    "Errors: " + result.getErrors());
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        // Remove device data from datastore
+        submitDataChangeToExecutor(executor, new Runnable() {
+            @Override
+            public void run() {
+                removeDeviceConfigAndState();
+            }
+        }, id);
+    }
+
+    private static boolean isUpdateSuccessful(final RpcResult<TransactionStatus> result) {
+        return result.getResult() == TransactionStatus.COMMITED && result.isSuccessful();
+    }
+
+    private static void submitDataChangeToExecutor(final ListeningExecutorService executor, final Runnable r,
+            final RemoteDeviceId id) {
+        // Submit data change
+        final ListenableFuture<?> f = executor.submit(r);
+        // Verify update execution
+        Futures.addCallback(f, new FutureCallback<Object>() {
+            @Override
+            public void onSuccess(final Object result) {
+                logger.debug("{}: Device data updated successfully", id);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                logger.warn("{}: Device data update failed", id, t);
+            }
+        });
+    }
+
+    public static org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node buildDataForDeviceState(
+            final boolean up, final Set<QName> capabilities, final RemoteDeviceId id) {
+
+        final NodeBuilder nodeBuilder = getNodeWithIdBuilder(id);
+        final NetconfNodeBuilder netconfNodeBuilder = new NetconfNodeBuilder();
+        netconfNodeBuilder.setConnected(up);
+        netconfNodeBuilder.setInitialCapability(FluentIterable.from(capabilities)
+                .transform(new Function<QName, String>() {
+                    @Override
+                    public String apply(final QName input) {
+                        return input.toString();
+                    }
+                }).toList());
+        nodeBuilder.addAugmentation(NetconfNode.class, netconfNodeBuilder.build());
+
+        return nodeBuilder.build();
+    }
+
+    private static boolean configurationNodeNotExisting(final DataModificationTransaction transaction,
+            final InstanceIdentifier<Node> path) {
+        return null == transaction.readConfigurationData(path);
+    }
+
+    private static boolean operationalNodeNotExisting(final DataModificationTransaction transaction,
+            final InstanceIdentifier<Node> path) {
+        return null == transaction.readOperationalData(path);
+    }
+
+    private static Node getNodeWithId(final RemoteDeviceId id) {
+        final NodeBuilder nodeBuilder = getNodeWithIdBuilder(id);
+        return nodeBuilder.build();
+    }
+
+    private static NodeBuilder getNodeWithIdBuilder(final RemoteDeviceId id) {
+        final NodeBuilder nodeBuilder = new NodeBuilder();
+        nodeBuilder.setKey(id.getBindingKey());
+        nodeBuilder.setId(id.getBindingKey().getId());
+        return nodeBuilder;
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceRpc.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceRpc.java
new file mode 100644 (file)
index 0000000..927d418
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connect.api.MessageTransformer;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Invokes RPC by sending netconf message via listener. Also transforms result from NetconfMessage to CompositeNode.
+ */
+public final class NetconfDeviceRpc implements RpcImplementation {
+    private final RemoteDeviceCommunicator<NetconfMessage> listener;
+    private final MessageTransformer<NetconfMessage> transformer;
+
+    public NetconfDeviceRpc(final RemoteDeviceCommunicator<NetconfMessage> listener, final MessageTransformer<NetconfMessage> transformer) {
+        this.listener = listener;
+        this.transformer = transformer;
+    }
+
+    @Override
+    public Set<QName> getSupportedRpcs() {
+        // TODO is this correct ?
+        return Collections.emptySet();
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode input) {
+        final NetconfMessage message = transformRequest(rpc, input);
+        final ListenableFuture<RpcResult<NetconfMessage>> delegateFutureWithPureResult = listener.sendRequest(
+                message, rpc);
+
+
+        return Futures.transform(delegateFutureWithPureResult, new Function<RpcResult<NetconfMessage>, RpcResult<CompositeNode>>() {
+            @Override
+            public RpcResult<CompositeNode> apply(@Nullable final RpcResult<NetconfMessage> input) {
+                return transformResult(input, rpc);
+            }
+        });
+    }
+
+    private NetconfMessage transformRequest(final QName rpc, final CompositeNode input) {
+        return transformer.toRpcRequest(rpc, input);
+    }
+
+    private RpcResult<CompositeNode> transformResult(final RpcResult<NetconfMessage> netconfMessageRpcResult,
+                                                                  final QName rpc) {
+        if (netconfMessageRpcResult.isSuccessful()) {
+            return transformer.toRpcResult(netconfMessageRpcResult.getResult(), rpc);
+        } else {
+            return Rpcs.getRpcResult(false, netconfMessageRpcResult.getErrors());
+        }
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java
new file mode 100644 (file)
index 0000000..f39c4d5
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessionCapabilities> {
+
+    private static final Logger logger= LoggerFactory.getLogger(NetconfDeviceTwoPhaseCommitTransaction.class);
+    private static final InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
+
+    private final RemoteDeviceId id;
+    private final NetconfDeviceSalProvider salProvider;
+
+    private final List<AutoCloseable> salRegistrations = Lists.newArrayList();
+
+    public NetconfDeviceSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, final BundleContext bundleContext, final ExecutorService executor) {
+        this.id = id;
+        this.salProvider = new NetconfDeviceSalProvider(id, executor);
+        registerToSal(domBroker, bindingBroker, bundleContext);
+    }
+
+    public void registerToSal(final Broker domRegistryDependency, final BindingAwareBroker bindingBroker, final BundleContext bundleContext) {
+        domRegistryDependency.registerProvider(salProvider, bundleContext);
+        bindingBroker.registerProvider(salProvider, bundleContext);
+    }
+
+    @Override
+    public synchronized void onNotification(final CompositeNode domNotification) {
+        salProvider.getMountInstance().publish(domNotification);
+    }
+
+    @Override
+    public synchronized void onDeviceConnected(final SchemaContextProvider remoteSchemaContextProvider,
+                                               final NetconfSessionCapabilities netconfSessionPreferences, final RpcImplementation deviceRpc) {
+        salProvider.getMountInstance().setSchemaContext(remoteSchemaContextProvider.getSchemaContext());
+        salProvider.getDatastoreAdapter().updateDeviceState(true, netconfSessionPreferences.getModuleBasedCaps());
+        registerDataHandlersToSal(deviceRpc, netconfSessionPreferences);
+        registerRpcsToSal(deviceRpc);
+    }
+
+    @Override
+    public void onDeviceDisconnected() {
+        salProvider.getDatastoreAdapter().updateDeviceState(false, Collections.<QName>emptySet());
+    }
+
+    private void registerRpcsToSal(final RpcImplementation deviceRpc) {
+        final MountProvisionInstance mountInstance = salProvider.getMountInstance();
+
+        final Map<QName, String> failedRpcs = Maps.newHashMap();
+        for (final RpcDefinition rpcDef : mountInstance.getSchemaContext().getOperations()) {
+            try {
+                salRegistrations.add(mountInstance.addRpcImplementation(rpcDef.getQName(), deviceRpc));
+                logger.debug("{}: Rpc {} from netconf registered successfully", id, rpcDef.getQName());
+            } catch (final Exception e) {
+                // Only debug per rpc, warn for all of them at the end to pollute log a little less (e.g. routed rpcs)
+                logger.debug("{}: Unable to register rpc {} from netconf device. This rpc will not be available", id,
+                        rpcDef.getQName(), e);
+                failedRpcs.put(rpcDef.getQName(), e.getClass() + ":" + e.getMessage());
+            }
+        }
+
+        if (failedRpcs.isEmpty() == false) {
+            logger.warn("{}: Some rpcs from netconf device were not registered: {}", id, failedRpcs);
+        }
+    }
+
+    private void registerDataHandlersToSal(final RpcImplementation deviceRpc,
+            final NetconfSessionCapabilities netconfSessionPreferences) {
+        final NetconfDeviceDataReader dataReader = new NetconfDeviceDataReader(id, deviceRpc);
+        final NetconfDeviceCommitHandler commitHandler = new NetconfDeviceCommitHandler(id, deviceRpc,
+                netconfSessionPreferences.isRollbackSupported());
+
+        final MountProvisionInstance mountInstance = salProvider.getMountInstance();
+        salRegistrations.add(mountInstance.registerConfigurationReader(ROOT_PATH, dataReader));
+        salRegistrations.add(mountInstance.registerOperationalReader(ROOT_PATH, dataReader));
+        salRegistrations.add(mountInstance.registerCommitHandler(ROOT_PATH, commitHandler));
+    }
+
+    @Override
+    public void close() {
+        for (final AutoCloseable reg : Lists.reverse(salRegistrations)) {
+            closeGracefully(reg);
+        }
+        closeGracefully(salProvider);
+    }
+
+    private void closeGracefully(final AutoCloseable resource) {
+        if (resource != null) {
+            try {
+                resource.close();
+            } catch (final Exception e) {
+                logger.warn("{}: Ignoring exception while closing {}", id, resource, e);
+            }
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalProvider.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalProvider.java
new file mode 100644 (file)
index 0000000..01af84c
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
+import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+final class NetconfDeviceSalProvider implements AutoCloseable, Provider, BindingAwareProvider {
+
+    private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceSalProvider.class);
+
+    private final RemoteDeviceId id;
+    private final ExecutorService executor;
+    private volatile MountProvisionInstance mountInstance;
+    private volatile NetconfDeviceDatastoreAdapter datastoreAdapter;
+
+    public NetconfDeviceSalProvider(final RemoteDeviceId deviceId, final ExecutorService executor) {
+        this.id = deviceId;
+        this.executor = executor;
+    }
+
+    public MountProvisionInstance getMountInstance() {
+        Preconditions.checkState(mountInstance != null,
+                "%s: Sal provider was not initialized by sal. Cannot publish notification", id);
+        return mountInstance;
+    }
+
+    public NetconfDeviceDatastoreAdapter getDatastoreAdapter() {
+        Preconditions.checkState(datastoreAdapter != null,
+                "%s: Sal provider %s was not initialized by sal. Cannot publish notification", id);
+        return datastoreAdapter;
+    }
+
+    @Override
+    public void onSessionInitiated(final Broker.ProviderSession session) {
+        final MountProvisionService mountService = session.getService(MountProvisionService.class);
+        if (mountService != null) {
+            mountInstance = mountService.createOrGetMountPoint(id.getPath());
+        }
+
+        logger.debug("{}: (BI)Session with sal established {}", id, session);
+    }
+
+    @Override
+    public Collection<Provider.ProviderFunctionality> getProviderFunctionality() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Collection<? extends RpcService> getImplementations() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Collection<? extends BindingAwareProvider.ProviderFunctionality> getFunctionality() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public void onSessionInitiated(final BindingAwareBroker.ProviderContext session) {
+        final DataProviderService dataBroker = session.getSALService(DataProviderService.class);
+        datastoreAdapter = new NetconfDeviceDatastoreAdapter(id, dataBroker, executor);
+
+        logger.debug("{}: Session with sal established {}", id, session);
+    }
+
+    @Override
+    public void onSessionInitialized(final BindingAwareBroker.ConsumerContext session) {
+    }
+
+    public void close() throws Exception {
+        mountInstance = null;
+        datastoreAdapter.close();
+        datastoreAdapter = null;
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTwoPhaseCommitTransaction.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTwoPhaseCommitTransaction.java
new file mode 100644 (file)
index 0000000..41f9fec
--- /dev/null
@@ -0,0 +1,252 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_CANDIDATE_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_COMMIT_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_CONFIG_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_EDIT_CONFIG_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_ERROR_OPTION_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_OPERATION_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_TARGET_QNAME;
+import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil.ROLLBACK_ON_ERROR_OPTION;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.connect.util.FailedRpcResult;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ *  Remote transaction that delegates data change to remote device using netconf messages.
+ */
+final class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction<InstanceIdentifier, CompositeNode> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTwoPhaseCommitTransaction.class);
+
+    private final DataModification<InstanceIdentifier, CompositeNode> modification;
+    private final RpcImplementation rpc;
+    private final boolean rollbackSupported;
+    private final RemoteDeviceId id;
+    private final CompositeNode targetNode;
+
+    public NetconfDeviceTwoPhaseCommitTransaction(final RemoteDeviceId id, final RpcImplementation rpc,
+            final DataModification<InstanceIdentifier, CompositeNode> modification,
+            final boolean candidateSupported, final boolean rollbackOnErrorSupported) {
+        this.id = id;
+        this.rpc = Preconditions.checkNotNull(rpc);
+        this.modification = Preconditions.checkNotNull(modification);
+        this.targetNode = getTargetNode(candidateSupported);
+        this.rollbackSupported = rollbackOnErrorSupported;
+    }
+
+    /**
+     * Prepare phase, sends 1 or more netconf edit config operations to modify the data
+     *
+     * In case of failure or unexpected error response, ExecutionException is thrown
+     */
+    void prepare() throws InterruptedException, ExecutionException {
+        for (final InstanceIdentifier toRemove : modification.getRemovedConfigurationData()) {
+            sendDelete(toRemove);
+        }
+        for(final Entry<InstanceIdentifier, CompositeNode> toUpdate : modification.getUpdatedConfigurationData().entrySet()) {
+            sendMerge(toUpdate.getKey(),toUpdate.getValue());
+        }
+    }
+
+    private void sendMerge(final InstanceIdentifier key, final CompositeNode value) throws InterruptedException, ExecutionException {
+        sendEditRpc(createEditConfigStructure(key, Optional.<String>absent(), Optional.of(value)));
+    }
+
+    private void sendDelete(final InstanceIdentifier toDelete) throws InterruptedException, ExecutionException {
+        sendEditRpc(createEditConfigStructure(toDelete, Optional.of("delete"), Optional.<CompositeNode>absent()));
+    }
+
+    private void sendEditRpc(final CompositeNode editStructure) throws InterruptedException, ExecutionException {
+        final ImmutableCompositeNode editConfigRequest = createEditConfigRequest(editStructure);
+        final RpcResult<CompositeNode> rpcResult = rpc.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, editConfigRequest).get();
+        // TODO 874 add default operation when sending delete
+
+        // Check result
+        if(rpcResult.isSuccessful() == false) {
+            throw new ExecutionException(
+                    String.format("%s: Pre-commit rpc failed, request: %s, errors: %s", id, editConfigRequest, rpcResult.getErrors()), null);
+        }
+    }
+
+    private ImmutableCompositeNode createEditConfigRequest(final CompositeNode editStructure) {
+        final CompositeNodeBuilder<ImmutableCompositeNode> ret = ImmutableCompositeNode.builder();
+
+        final Node<?> targetWrapperNode = ImmutableCompositeNode.create(NETCONF_TARGET_QNAME, ImmutableList.<Node<?>>of(targetNode));
+        ret.add(targetWrapperNode);
+
+        if(rollbackSupported) {
+            ret.addLeaf(NETCONF_ERROR_OPTION_QNAME, ROLLBACK_ON_ERROR_OPTION);
+        }
+        ret.setQName(NETCONF_EDIT_CONFIG_QNAME);
+        ret.add(editStructure);
+        return ret.toInstance();
+    }
+
+    private CompositeNode createEditConfigStructure(final InstanceIdentifier dataPath, final Optional<String> operation,
+                                                    final Optional<CompositeNode> lastChildOverride) {
+        Preconditions.checkArgument(dataPath.getPath().isEmpty() == false, "Instance identifier with empty path %s", dataPath);
+
+        List<PathArgument> reversedPath = Lists.reverse(dataPath.getPath());
+
+        // Create deepest edit element with expected edit operation
+        CompositeNode previous = getDeepestEditElement(reversedPath.get(0), operation, lastChildOverride);
+
+        // Remove already processed deepest child
+        reversedPath = Lists.newArrayList(reversedPath);
+        reversedPath.remove(0);
+
+        // Create edit structure in reversed order
+        for (final PathArgument arg : reversedPath) {
+            final CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
+            builder.setQName(arg.getNodeType());
+
+            addPredicatesToCompositeNodeBuilder(getPredicates(arg), builder);
+
+            builder.add(previous);
+            previous = builder.toInstance();
+        }
+        return ImmutableCompositeNode.create(NETCONF_CONFIG_QNAME, ImmutableList.<Node<?>>of(previous));
+    }
+
+    private void addPredicatesToCompositeNodeBuilder(final Map<QName, Object> predicates, final CompositeNodeBuilder<ImmutableCompositeNode> builder) {
+        for (final Entry<QName, Object> entry : predicates.entrySet()) {
+            builder.addLeaf(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private Map<QName, Object> getPredicates(final PathArgument arg) {
+        Map<QName, Object> predicates = Collections.emptyMap();
+        if (arg instanceof NodeIdentifierWithPredicates) {
+            predicates = ((NodeIdentifierWithPredicates) arg).getKeyValues();
+        }
+        return predicates;
+    }
+
+    private CompositeNode getDeepestEditElement(final PathArgument arg, final Optional<String> operation, final Optional<CompositeNode> lastChildOverride) {
+        final CompositeNodeBuilder<ImmutableCompositeNode> builder = ImmutableCompositeNode.builder();
+        builder.setQName(arg.getNodeType());
+
+        final Map<QName, Object> predicates = getPredicates(arg);
+        addPredicatesToCompositeNodeBuilder(predicates, builder);
+
+        if (operation.isPresent()) {
+            builder.setAttribute(NETCONF_OPERATION_QNAME, operation.get());
+        }
+        if (lastChildOverride.isPresent()) {
+            final List<Node<?>> children = lastChildOverride.get().getValue();
+            for(final Node<?> child : children) {
+                if(!predicates.containsKey(child.getKey())) {
+                    builder.add(child);
+                }
+            }
+        }
+
+        return builder.toInstance();
+    }
+
+    /**
+     * Send commit rpc to finish the transaction
+     * In case of failure or unexpected error response, ExecutionException is thrown
+     */
+    @Override
+    public RpcResult<Void> finish() {
+        try {
+            final RpcResult<?> rpcResult = rpc.invokeRpc(NetconfMessageTransformUtil.NETCONF_COMMIT_QNAME, getCommitRequest()).get();
+            return new RpcResultVoidWrapper(rpcResult);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(id + ": Interrupted while waiting for response", e);
+        } catch (final ExecutionException e) {
+            LOG.warn("{}: Failed to finish commit operation", id, e);
+            return new FailedRpcResult<>(RpcErrors.getRpcError(null, null, null, RpcError.ErrorSeverity.ERROR,
+                    id + ": Unexpected operation error during commit operation", RpcError.ErrorType.APPLICATION, e));
+        }
+    }
+
+    private ImmutableCompositeNode getCommitRequest() {
+        final CompositeNodeBuilder<ImmutableCompositeNode> commitInput = ImmutableCompositeNode.builder();
+        commitInput.setQName(NETCONF_COMMIT_QNAME);
+        return commitInput.toInstance();
+    }
+
+    @Override
+    public DataModification<InstanceIdentifier, CompositeNode> getModification() {
+        return this.modification;
+    }
+
+    @Override
+    public RpcResult<Void> rollback() throws IllegalStateException {
+        // TODO BUG-732 implement rollback by sending discard changes
+        return null;
+    }
+
+    public CompositeNode getTargetNode(final boolean candidateSupported) {
+        if(candidateSupported) {
+            return ImmutableCompositeNode.create(NETCONF_CANDIDATE_QNAME, ImmutableList.<Node<?>>of());
+        } else {
+            return ImmutableCompositeNode.create(NETCONF_RUNNING_QNAME, ImmutableList.<Node<?>>of());
+        }
+    }
+
+    private static final class RpcResultVoidWrapper implements RpcResult<Void> {
+
+        private final RpcResult<?> rpcResult;
+
+        public RpcResultVoidWrapper(final RpcResult<?> rpcResult) {
+            this.rpcResult = rpcResult;
+        }
+
+        @Override
+        public boolean isSuccessful() {
+            return rpcResult.isSuccessful();
+        }
+
+        @Override
+        public Void getResult() {
+            return null;
+        }
+
+        @Override
+        public Collection<RpcError> getErrors() {
+            return rpcResult.getErrors();
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/NetconfDeviceSchemaProviderFactory.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/NetconfDeviceSchemaProviderFactory.java
new file mode 100644 (file)
index 0000000..9f844fd
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.schema;
+
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
+import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public final class NetconfDeviceSchemaProviderFactory implements SchemaContextProviderFactory {
+
+    private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceSchemaProviderFactory.class);
+
+    private final RemoteDeviceId id;
+
+    public NetconfDeviceSchemaProviderFactory(final RemoteDeviceId id) {
+        this.id = id;
+    }
+
+    @Override
+    public SchemaContextProvider createContextProvider(final Collection<QName> capabilities, final SchemaSourceProvider<InputStream> sourceProvider) {
+
+        final YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
+
+        if (sourceContext.getMissingSources().isEmpty() == false) {
+            logger.warn("{}: Sources for following models are missing {}", id, sourceContext.getMissingSources());
+        }
+
+        logger.debug("{}: Trying to create schema context from {}", id, sourceContext.getValidSources());
+        final List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
+
+        Preconditions.checkState(sourceContext.getValidSources().isEmpty() == false,
+                "%s: Unable to create schema context, no sources provided by device", id);
+        try {
+            final SchemaContext schemaContext = tryToParseContext(modelsToParse);
+            logger.debug("{}: Schema context successfully created.", id);
+            return new NetconfSchemaContextProvider(schemaContext);
+        } catch (final RuntimeException e) {
+            logger.error("{}: Unable to create schema context, unexpected error", id, e);
+            throw new IllegalStateException(id + ": Unable to create schema context", e);
+        }
+    }
+
+    private static SchemaContext tryToParseContext(final List<InputStream> modelsToParse) {
+        final YangParserImpl parser = new YangParserImpl();
+        final Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
+        return parser.resolveSchemaContext(models);
+    }
+
+    private static final class NetconfSchemaContextProvider implements SchemaContextProvider {
+        private final SchemaContext schemaContext;
+
+        public NetconfSchemaContextProvider(final SchemaContext schemaContext) {
+            this.schemaContext = schemaContext;
+        }
+
+        @Override
+        public SchemaContext getSchemaContext() {
+            return schemaContext;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/NetconfRemoteSchemaSourceProvider.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/NetconfRemoteSchemaSourceProvider.java
new file mode 100644 (file)
index 0000000..b43e03b
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.schema;
+
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+public final class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider<String> {
+
+    public static final QName GET_SCHEMA_QNAME = QName.create(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING,
+            "get-schema");
+    public static final QName GET_DATA_QNAME = QName
+            .create(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING, "data");
+
+    private static final Logger logger = LoggerFactory.getLogger(NetconfRemoteSchemaSourceProvider.class);
+
+    private final RpcImplementation rpc;
+    private final RemoteDeviceId id;
+
+    public NetconfRemoteSchemaSourceProvider(final RemoteDeviceId id, final RpcImplementation rpc) {
+        this.id = id;
+        this.rpc = Preconditions.checkNotNull(rpc);
+    }
+
+    @Override
+    public Optional<String> getSchemaSource(final String moduleName, final Optional<String> revision) {
+        final ImmutableCompositeNode getSchemaRequest = createGetSchemaRequest(moduleName, revision);
+
+        logger.trace("{}: Loading YANG schema source for {}:{}", id, moduleName, revision);
+        try {
+            final RpcResult<CompositeNode> schemaReply = rpc.invokeRpc(GET_SCHEMA_QNAME, getSchemaRequest).get();
+            if (schemaReply.isSuccessful()) {
+                final Optional<String> schemaBody = getSchemaFromRpc(id, schemaReply.getResult());
+                if (schemaBody.isPresent()) {
+                    logger.debug("{}: YANG Schema successfully retrieved for {}:{}", id, moduleName, revision);
+                    return schemaBody;
+                }
+            } else {
+                logger.warn("{}: YANG schema was not successfully retrieved for {}:{}. Errors: {}", id, moduleName,
+                        revision, schemaReply.getErrors());
+            }
+            return Optional.absent();
+        } catch (final InterruptedException e){
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException(e);
+        } catch (final Exception e) {
+            logger.error("{}: YANG schema was not successfully retrieved for {}:{}", id, moduleName, revision, e);
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private ImmutableCompositeNode createGetSchemaRequest(final String moduleName, final Optional<String> revision) {
+        final CompositeNodeBuilder<ImmutableCompositeNode> request = ImmutableCompositeNode.builder();
+        request.setQName(GET_SCHEMA_QNAME)
+                .addLeaf("format", "yang")
+                .addLeaf("identifier", moduleName);
+
+        if (revision.isPresent()) {
+            request.addLeaf("version", revision.get());
+        }
+        return request.toInstance();
+    }
+
+    private static Optional<String> getSchemaFromRpc(final RemoteDeviceId id, final CompositeNode result) {
+        if (result == null) {
+            return Optional.absent();
+        }
+        final SimpleNode<?> simpleNode = result.getFirstSimpleByName(GET_DATA_QNAME.withoutRevision());
+
+        Preconditions.checkNotNull(simpleNode,
+                "%s Unexpected response to get-schema, expected response with one child %s, but was %s",
+                id, GET_DATA_QNAME.withoutRevision(), result);
+
+        final Object potential = simpleNode.getValue();
+        return potential instanceof String ? Optional.of((String) potential) : Optional.<String>absent();
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java
new file mode 100644 (file)
index 0000000..c85a529
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.schema.mapping;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import javax.activation.UnsupportedDataTypeException;
+
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connect.api.MessageTransformer;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.connect.util.MessageCounter;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
+import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import com.google.common.base.Optional;
+
+public class NetconfMessageTransformer implements MessageTransformer<NetconfMessage> {
+
+    public static final String MESSAGE_ID_PREFIX = "m";
+
+    private Optional<SchemaContext> schemaContext = Optional.absent();
+    private final MessageCounter counter;
+
+    public NetconfMessageTransformer() {
+        this.counter = new MessageCounter();
+    }
+
+    @Override
+    public synchronized CompositeNode toNotification(final NetconfMessage message) {
+        if(schemaContext.isPresent()) {
+            return toNotification(message, schemaContext.get());
+        } else {
+            return XmlDocumentUtils.notificationToDomNodes(message.getDocument(), Optional.<Set<NotificationDefinition>>absent());
+        }
+    }
+
+    private static CompositeNode toNotification(final NetconfMessage message, final SchemaContext ctx) {
+        final Set<NotificationDefinition> notifications = ctx.getNotifications();
+        final Document document = message.getDocument();
+        return XmlDocumentUtils.notificationToDomNodes(document, Optional.fromNullable(notifications));
+    }
+
+    @Override
+    public NetconfMessage toRpcRequest(final QName rpc, final CompositeNode node) {
+        final CompositeNodeTOImpl rpcPayload = NetconfMessageTransformUtil.wrap(
+                NetconfMessageTransformUtil.NETCONF_RPC_QNAME, NetconfMessageTransformUtil.flattenInput(node));
+        final Document w3cPayload;
+        try {
+            w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, XmlDocumentUtils.defaultValueCodecProvider());
+        } catch (final UnsupportedDataTypeException e) {
+            throw new IllegalArgumentException("Unable to create message", e);
+        }
+        w3cPayload.getDocumentElement().setAttribute("message-id", counter.getNewMessageId(MESSAGE_ID_PREFIX));
+        return new NetconfMessage(w3cPayload);
+    }
+
+    @Override
+    public synchronized RpcResult<CompositeNode> toRpcResult(final NetconfMessage message, final QName rpc) {
+        if(schemaContext.isPresent()) {
+            return toRpcResult(message, rpc, schemaContext.get());
+        } else {
+            final CompositeNode node = (CompositeNode) XmlDocumentUtils.toDomNode(message.getDocument());
+            return Rpcs.getRpcResult(true, node, Collections.<RpcError>emptySet());
+        }
+    }
+
+    private static RpcResult<CompositeNode> toRpcResult(final NetconfMessage message, final QName rpc, final SchemaContext context) {
+        final CompositeNode compositeNode;
+
+        if (NetconfMessageTransformUtil.isDataRetrievalOperation(rpc)) {
+
+            final Element xmlData = NetconfMessageTransformUtil.getDataSubtree(message.getDocument());
+
+            final List<org.opendaylight.yangtools.yang.data.api.Node<?>> dataNodes = XmlDocumentUtils.toDomNodes(xmlData,
+                    Optional.of(context.getDataDefinitions()), context);
+
+            final CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
+            it.setQName(NetconfMessageTransformUtil.NETCONF_RPC_REPLY_QNAME);
+            it.add(ImmutableCompositeNode.create(NetconfMessageTransformUtil.NETCONF_DATA_QNAME, dataNodes));
+
+            compositeNode = it.toInstance();
+        } else {
+            // TODO map rpc with schema
+            compositeNode = (CompositeNode) XmlDocumentUtils.toDomNode(message.getDocument());
+        }
+
+        return Rpcs.getRpcResult(true, compositeNode, Collections.<RpcError> emptySet());
+    }
+
+    @Override
+    public synchronized void onGlobalContextUpdated(final SchemaContext schemaContext) {
+        this.schemaContext = Optional.of(schemaContext);
+    }
+}
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.sal.connect.netconf;
+package org.opendaylight.controller.sal.connect.netconf.util;
 
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.activation.UnsupportedDataTypeException;
+import java.util.Map;
 import javax.annotation.Nullable;
 
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
-public class NetconfMapping {
+public class NetconfMessageTransformUtil {
 
-    public static URI NETCONF_URI = URI.create("urn:ietf:params:xml:ns:netconf:base:1.0");
-    public static String NETCONF_MONITORING_URI = "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring";
-    public static URI NETCONF_NOTIFICATION_URI = URI.create("urn:ietf:params:xml:ns:netconf:notification:1.0");
-    public static URI NETCONF_ROLLBACK_ON_ERROR_URI = URI.create("urn:ietf:params:netconf:capability:rollback-on-error:1.0");
+    private NetconfMessageTransformUtil() {
+    }
 
+    public static final QName IETF_NETCONF_MONITORING = QName.create(
+            "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring", "2010-10-04", "ietf-netconf-monitoring");
+    public static URI NETCONF_URI = URI.create("urn:ietf:params:xml:ns:netconf:base:1.0");
     public static QName NETCONF_QNAME = QName.create(NETCONF_URI, null, "netconf");
-    public static QName NETCONF_RPC_QNAME = QName.create(NETCONF_QNAME, "rpc");
-    public static QName NETCONF_GET_QNAME = QName.create(NETCONF_QNAME, "get");
-    public static QName NETCONF_FILTER_QNAME = QName.create(NETCONF_QNAME, "filter");
-    public static QName NETCONF_TYPE_QNAME = QName.create(NETCONF_QNAME, "type");
-    public static QName NETCONF_GET_CONFIG_QNAME = QName.create(NETCONF_QNAME, "get-config");
-    public static QName NETCONF_EDIT_CONFIG_QNAME = QName.create(NETCONF_QNAME, "edit-config");
-    public static QName NETCONF_DELETE_CONFIG_QNAME = QName.create(NETCONF_QNAME, "delete-config");
-    public static QName NETCONF_OPERATION_QNAME = QName.create(NETCONF_QNAME, "operation");
-    public static QName NETCONF_COMMIT_QNAME = QName.create(NETCONF_QNAME, "commit");
-
-    public static QName NETCONF_CONFIG_QNAME = QName.create(NETCONF_QNAME, "config");
-    public static QName NETCONF_SOURCE_QNAME = QName.create(NETCONF_QNAME, "source");
-    public static QName NETCONF_TARGET_QNAME = QName.create(NETCONF_QNAME, "target");
-
-    public static QName NETCONF_CANDIDATE_QNAME = QName.create(NETCONF_QNAME, "candidate");
-    public static QName NETCONF_RUNNING_QNAME = QName.create(NETCONF_QNAME, "running");
-
-    public static QName NETCONF_ERROR_OPTION_QNAME = QName.create(NETCONF_QNAME, "error-option");
-    public static String ROLLBACK_ON_ERROR_OPTION = "rollback-on-error";
-
-    public static QName NETCONF_RPC_REPLY_QNAME = QName.create(NETCONF_QNAME, "rpc-reply");
-    public static QName NETCONF_OK_QNAME = QName.create(NETCONF_QNAME, "ok");
     public static QName NETCONF_DATA_QNAME = QName.create(NETCONF_QNAME, "data");
-    public static QName NETCONF_CREATE_SUBSCRIPTION_QNAME = QName.create(NETCONF_NOTIFICATION_URI, null,
-            "create-subscription");
-    public static QName NETCONF_CANCEL_SUBSCRIPTION_QNAME = QName.create(NETCONF_NOTIFICATION_URI, null,
-            "cancel-subscription");
-    public static QName IETF_NETCONF_MONITORING_MODULE = QName.create(NETCONF_MONITORING_URI, "2010-10-04",
-            "ietf-netconf-monitoring");
-
-    static List<Node<?>> RUNNING = Collections.<Node<?>> singletonList(new SimpleNodeTOImpl(NETCONF_RUNNING_QNAME,
+    public static QName NETCONF_RPC_REPLY_QNAME = QName.create(NETCONF_QNAME, "rpc-reply");
+    public static QName NETCONF_ERROR_OPTION_QNAME = QName.create(NETCONF_QNAME, "error-option");
+    public static QName NETCONF_RUNNING_QNAME = QName.create(NETCONF_QNAME, "running");
+    static List<Node<?>> RUNNING = Collections.<Node<?>> singletonList(new SimpleNodeTOImpl<>(NETCONF_RUNNING_QNAME,
             null, null));
-
+    public static QName NETCONF_SOURCE_QNAME = QName.create(NETCONF_QNAME, "source");
     public static CompositeNode CONFIG_SOURCE_RUNNING = new CompositeNodeTOImpl(NETCONF_SOURCE_QNAME, null, RUNNING);
+    public static QName NETCONF_CANDIDATE_QNAME = QName.create(NETCONF_QNAME, "candidate");
+    public static QName NETCONF_TARGET_QNAME = QName.create(NETCONF_QNAME, "target");
+    public static QName NETCONF_CONFIG_QNAME = QName.create(NETCONF_QNAME, "config");
+    public static QName NETCONF_COMMIT_QNAME = QName.create(NETCONF_QNAME, "commit");
+    public static QName NETCONF_OPERATION_QNAME = QName.create(NETCONF_QNAME, "operation");
+    public static QName NETCONF_EDIT_CONFIG_QNAME = QName.create(NETCONF_QNAME, "edit-config");
+    public static QName NETCONF_GET_CONFIG_QNAME = QName.create(NETCONF_QNAME, "get-config");
+    public static QName NETCONF_TYPE_QNAME = QName.create(NETCONF_QNAME, "type");
+    public static QName NETCONF_FILTER_QNAME = QName.create(NETCONF_QNAME, "filter");
+    public static QName NETCONF_GET_QNAME = QName.create(NETCONF_QNAME, "get");
+    public static QName NETCONF_RPC_QNAME = QName.create(NETCONF_QNAME, "rpc");
+    public static URI NETCONF_ROLLBACK_ON_ERROR_URI = URI
+            .create("urn:ietf:params:netconf:capability:rollback-on-error:1.0");
+    public static String ROLLBACK_ON_ERROR_OPTION = "rollback-on-error";
 
-    static AtomicInteger messageId = new AtomicInteger(0);
-
-    static Node<?> toFilterStructure(final InstanceIdentifier identifier) {
+    public static Node<?> toFilterStructure(final InstanceIdentifier identifier) {
         Node<?> previous = null;
         if (identifier.getPath().isEmpty()) {
             return null;
         }
 
-        for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument component : Lists
+        for (final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument component : Lists
                 .reverse(identifier.getPath())) {
-            if (component instanceof NodeIdentifierWithPredicates) {
-                previous = toNode((NodeIdentifierWithPredicates)component, previous);
+            if (component instanceof InstanceIdentifier.NodeIdentifierWithPredicates) {
+                previous = toNode((InstanceIdentifier.NodeIdentifierWithPredicates)component, previous);
             } else {
                 previous = toNode(component, previous);
             }
@@ -109,9 +85,9 @@ public class NetconfMapping {
         return filter("subtree", previous);
     }
 
-    static Node<?> toNode(final NodeIdentifierWithPredicates argument, final Node<?> node) {
-        List<Node<?>> list = new ArrayList<>();
-        for (Map.Entry<QName, Object> arg : argument.getKeyValues().entrySet()) {
+    static Node<?> toNode(final InstanceIdentifier.NodeIdentifierWithPredicates argument, final Node<?> node) {
+        final List<Node<?>> list = new ArrayList<>();
+        for (final Map.Entry<QName, Object> arg : argument.getKeyValues().entrySet()) {
             list.add(new SimpleNodeTOImpl(arg.getKey(), null, arg.getValue()));
         }
         if (node != null) {
@@ -120,51 +96,31 @@ public class NetconfMapping {
         return new CompositeNodeTOImpl(argument.getNodeType(), null, list);
     }
 
-    static Node<?> toNode(final PathArgument argument, final Node<?> node) {
-        if (node != null) {
-            return new CompositeNodeTOImpl(argument.getNodeType(), null, Collections.<Node<?>> singletonList(node));
-        } else {
-            return new SimpleNodeTOImpl(argument.getNodeType(), null, null);
-        }
-    }
-
-    static CompositeNode toCompositeNode(final NetconfMessage message, final Optional<SchemaContext> ctx) {
-        // TODO: implement general normalization to normalize incoming Netconf
-        // Message
-        // for Schema Context counterpart
-        return null;
-    }
+    public static void checkValidReply(final NetconfMessage input, final NetconfMessage output) {
+        final String inputMsgId = input.getDocument().getDocumentElement().getAttribute("message-id");
+        final String outputMsgId = output.getDocument().getDocumentElement().getAttribute("message-id");
 
-    static CompositeNode toNotificationNode(final NetconfMessage message, final Optional<SchemaContext> ctx) {
-        if (ctx.isPresent()) {
-            SchemaContext schemaContext = ctx.get();
-            Set<NotificationDefinition> notifications = schemaContext.getNotifications();
-            Document document = message.getDocument();
-            return XmlDocumentUtils.notificationToDomNodes(document, Optional.fromNullable(notifications), ctx.get());
+        if(inputMsgId.equals(outputMsgId) == false) {
+            final String requestXml = XmlUtil.toString(input.getDocument());
+            final String responseXml = XmlUtil.toString(output.getDocument());
+            throw new IllegalStateException(String.format("Rpc request and reply message IDs must be same. Request: %s, response: %s", requestXml, responseXml));
         }
-        return null;
     }
 
-    static NetconfMessage toRpcMessage(final QName rpc, final CompositeNode node, final Optional<SchemaContext> ctx) {
-        CompositeNodeTOImpl rpcPayload = wrap(NETCONF_RPC_QNAME, flattenInput(node));
-        Document w3cPayload = null;
-        try {
-            w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, XmlDocumentUtils.defaultValueCodecProvider());
-        } catch (UnsupportedDataTypeException e) {
-            throw new IllegalArgumentException("Unable to create message", e);
+    public static void checkSuccessReply(final NetconfMessage output) throws NetconfDocumentedException {
+        if(NetconfMessageUtil.isErrorMessage(output)) {
+            throw new IllegalStateException(String.format("Response contains error: %s", XmlUtil.toString(output.getDocument())));
         }
-        w3cPayload.getDocumentElement().setAttribute("message-id", "m-" + messageId.getAndIncrement());
-        return new NetconfMessage(w3cPayload);
     }
 
-    static CompositeNode flattenInput(final CompositeNode node) {
+    public static CompositeNode flattenInput(final CompositeNode node) {
         final QName inputQName = QName.create(node.getNodeType(), "input");
-        CompositeNode input = node.getFirstCompositeByName(inputQName);
+        final CompositeNode input = node.getFirstCompositeByName(inputQName);
         if (input == null)
             return node;
         if (input instanceof CompositeNode) {
 
-            List<Node<?>> nodes = ImmutableList.<Node<?>> builder() //
+            final List<Node<?>> nodes = ImmutableList.<Node<?>> builder() //
                     .addAll(input.getValue()) //
                     .addAll(Collections2.filter(node.getValue(), new Predicate<Node<?>>() {
                         @Override
@@ -180,43 +136,25 @@ public class NetconfMapping {
         return input;
     }
 
-    static RpcResult<CompositeNode> toRpcResult(final NetconfMessage message, final QName rpc, final Optional<SchemaContext> context) {
-        CompositeNode rawRpc;
-        if (context.isPresent())
-            if (isDataRetrieQNameReply(rpc)) {
-
-                Element xmlData = getDataSubtree(message.getDocument());
-
-                List<org.opendaylight.yangtools.yang.data.api.Node<?>> dataNodes = XmlDocumentUtils.toDomNodes(xmlData,
-                        Optional.of(context.get().getDataDefinitions()));
-
-                CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
-                it.setQName(NETCONF_RPC_REPLY_QNAME);
-                it.add(ImmutableCompositeNode.create(NETCONF_DATA_QNAME, dataNodes));
-
-                rawRpc = it.toInstance();
-                // sys(xmlData)
-            } else {
-                rawRpc = toCompositeNode(message, context);
-            }
-        else {
-            rawRpc = (CompositeNode) toCompositeNode(message.getDocument());
+    static Node<?> toNode(final InstanceIdentifier.PathArgument argument, final Node<?> node) {
+        if (node != null) {
+            return new CompositeNodeTOImpl(argument.getNodeType(), null, Collections.<Node<?>> singletonList(node));
+        } else {
+            return new SimpleNodeTOImpl<Void>(argument.getNodeType(), null, null);
         }
-        // rawRpc.
-        return Rpcs.getRpcResult(true, rawRpc, Collections.<RpcError> emptySet());
     }
 
-    static Element getDataSubtree(final Document doc) {
+    public static Element getDataSubtree(final Document doc) {
         return (Element) doc.getElementsByTagNameNS(NETCONF_URI.toString(), "data").item(0);
     }
 
-    static boolean isDataRetrieQNameReply(final QName it) {
-        return NETCONF_URI == it.getNamespace()
-                && (it.getLocalName() == NETCONF_GET_CONFIG_QNAME.getLocalName() || it.getLocalName() == NETCONF_GET_QNAME
-                .getLocalName());
+    public static boolean isDataRetrievalOperation(final QName rpc) {
+        return NETCONF_URI == rpc.getNamespace()
+                && (rpc.getLocalName().equals(NETCONF_GET_CONFIG_QNAME.getLocalName()) || rpc.getLocalName().equals(
+                        NETCONF_GET_QNAME.getLocalName()));
     }
 
-    static CompositeNodeTOImpl wrap(final QName name, final Node<?> node) {
+    public static CompositeNodeTOImpl wrap(final QName name, final Node<?> node) {
         if (node != null) {
             return new CompositeNodeTOImpl(name, null, Collections.<Node<?>> singletonList(node));
         } else {
@@ -224,7 +162,7 @@ public class NetconfMapping {
         }
     }
 
-    static CompositeNodeTOImpl wrap(final QName name, final Node<?> additional, final Node<?> node) {
+    public static CompositeNodeTOImpl wrap(final QName name, final Node<?> additional, final Node<?> node) {
         if (node != null) {
             return new CompositeNodeTOImpl(name, null, ImmutableList.of(additional, node));
         } else {
@@ -233,7 +171,7 @@ public class NetconfMapping {
     }
 
     static ImmutableCompositeNode filter(final String type, final Node<?> node) {
-        CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder(); //
+        final CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder(); //
         it.setQName(NETCONF_FILTER_QNAME);
         it.setAttribute(NETCONF_TYPE_QNAME, type);
         if (node != null) {
@@ -243,24 +181,4 @@ public class NetconfMapping {
         }
     }
 
-    public static Node<?> toCompositeNode(final Document document) {
-        return XmlDocumentUtils.toDomNode(document);
-    }
-
-    public static void checkValidReply(final NetconfMessage input, final NetconfMessage output) {
-        String inputMsgId = input.getDocument().getDocumentElement().getAttribute("message-id");
-        String outputMsgId = output.getDocument().getDocumentElement().getAttribute("message-id");
-
-        if(inputMsgId.equals(outputMsgId) == false) {
-            String requestXml = XmlUtil.toString(input.getDocument());
-            String responseXml = XmlUtil.toString(output.getDocument());
-            throw new IllegalStateException(String.format("Rpc request and reply message IDs must be same. Request: %s, response: %s", requestXml, responseXml));
-        }
-    }
-
-    public static void checkSuccessReply(final NetconfMessage output) throws NetconfDocumentedException {
-        if(NetconfMessageUtil.isErrorMessage(output)) {
-            throw new IllegalStateException(String.format("Response contains error: %s", XmlUtil.toString(output.getDocument())));
-        }
-    }
 }
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/FailedRpcResult.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/FailedRpcResult.java
new file mode 100644 (file)
index 0000000..49b16d4
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.util;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+public final class FailedRpcResult<T> implements RpcResult<T> {
+
+    private final RpcError rpcError;
+
+    public FailedRpcResult(final RpcError rpcError) {
+        this.rpcError = rpcError;
+    }
+
+    @Override
+    public boolean isSuccessful() {
+        return false;
+    }
+
+    @Override
+    public T getResult() {
+        return null;
+    }
+
+    @Override
+    public Collection<RpcError> getErrors() {
+        return Collections.singletonList(rpcError);
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/MessageCounter.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/MessageCounter.java
new file mode 100644 (file)
index 0000000..2b2f6a9
--- /dev/null
@@ -0,0 +1,21 @@
+package org.opendaylight.controller.sal.connect.util;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+public class MessageCounter {
+    final AtomicInteger messageId = new AtomicInteger(0);
+
+    private static final String messageIdBlueprint = "%s-%s";
+
+    public String getNewMessageId(final String prefix) {
+        Preconditions.checkArgument(Strings.isNullOrEmpty(prefix) == false, "Null or empty prefix");
+        return String.format(messageIdBlueprint, prefix, getNewMessageId());
+    }
+
+    public String getNewMessageId() {
+        return Integer.toString(messageId.getAndIncrement());
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/RemoteDeviceId.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/RemoteDeviceId.java
new file mode 100644 (file)
index 0000000..4670846
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.util;
+
+import org.opendaylight.controller.config.api.ModuleIdentifier;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.common.QName;
+
+public class RemoteDeviceId {
+
+    private final String name;
+    private final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path;
+    private final InstanceIdentifier<Node> bindingPath;
+    private final NodeKey key;
+
+    public RemoteDeviceId(final ModuleIdentifier identifier) {
+        this(Preconditions.checkNotNull(identifier).getInstanceName());
+    }
+
+    public RemoteDeviceId(final String name) {
+        Preconditions.checkNotNull(name);
+        this.name = name;
+        this.key = new NodeKey(new NodeId(name));
+        this.path = createBIPath(name);
+        this.bindingPath = createBindingPath(key);
+    }
+
+    private static InstanceIdentifier<Node> createBindingPath(final NodeKey key) {
+        return InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build();
+    }
+
+    private static org.opendaylight.yangtools.yang.data.api.InstanceIdentifier createBIPath(final String name) {
+        final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.InstanceIdentifierBuilder builder =
+                org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder();
+        builder.node(Nodes.QNAME).nodeWithKey(Node.QNAME, QName.create(Node.QNAME.getNamespace(), Node.QNAME.getRevision(), "id"), name);
+
+        return builder.build();
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public InstanceIdentifier<Node> getBindingPath() {
+        return bindingPath;
+    }
+
+    public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier getPath() {
+        return path;
+    }
+
+    public NodeKey getBindingKey() {
+        return key;
+    }
+
+    @Override
+    public String toString() {
+        return "RemoteDevice{" + name +'}';
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (!(o instanceof RemoteDeviceId)) return false;
+
+        final RemoteDeviceId that = (RemoteDeviceId) o;
+
+        if (!name.equals(that.name)) return false;
+        if (!bindingPath.equals(that.bindingPath)) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = name.hashCode();
+        result = 31 * result + bindingPath.hashCode();
+        return result;
+    }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/package-info.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/package-info.java
new file mode 100644 (file)
index 0000000..21b9d3a
--- /dev/null
@@ -0,0 +1,6 @@
+/**
+ * Utility classes for remote connectors e.g. netconf connector
+ *
+ * TODO extract into separate bundle when another connector is implemented e.g. restconf connector
+ */
+package org.opendaylight.controller.sal.connect.util;
index d4dad11..c59c41c 100644 (file)
@@ -3,10 +3,11 @@ module odl-sal-netconf-connector-cfg {
     namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf";
     prefix "sal-netconf";
 
-       import config { prefix config; revision-date 2013-04-05; }
-       import threadpool {prefix th;}
-       import netty {prefix netty;}
-       import opendaylight-md-sal-dom {prefix dom;}
+    import config { prefix config; revision-date 2013-04-05; }
+    import threadpool {prefix th;}
+    import netty {prefix netty;}
+    import opendaylight-md-sal-dom {prefix dom;}
+    import opendaylight-md-sal-binding {prefix md-sal-binding; revision-date 2013-10-28;}
     import odl-netconf-cfg { prefix cfg-net; revision-date 2014-04-08; }
 
     description
@@ -22,7 +23,6 @@ module odl-sal-netconf-connector-cfg {
         config:java-name-prefix NetconfConnector;
     }
 
-
     grouping server {
         leaf address {
             type string;
@@ -33,7 +33,6 @@ module odl-sal-netconf-connector-cfg {
         }
     }
 
-
     augment "/config:modules/config:module/config:configuration" {
         case sal-netconf-connector {
             when "/config:modules/config:module/config:type = 'sal-netconf-connector'";
@@ -67,6 +66,16 @@ module odl-sal-netconf-connector-cfg {
                 }
             }
 
+            container binding-registry {
+                uses config:service-ref {
+                    refine type {
+                        // FIXME BUG-944 make mandatory (remove backwards compatibility)
+                        mandatory false;
+                        config:required-identity md-sal-binding:binding-broker-osgi-registry;
+                    }
+                }
+            }
+
             // FIXME BUG-944 remove backwards compatibility
             // Deprecated, replaced by client dispatcher.
             // This dependency will be removed in near future and all configurations of netconf-connector need to be changed to use dispatcher dependency.
@@ -99,6 +108,18 @@ module odl-sal-netconf-connector-cfg {
                 }
             }
 
+            container processing-executor {
+                uses config:service-ref {
+                    refine type {
+                        // FIXME BUG-944 make mandatory (remove backwards compatibility)
+                        mandatory false;
+                        config:required-identity th:threadpool;
+                    }
+                }
+
+                description "Makes up for flaws in netty threading design";
+            }
+
             // Replaces thread group dependencies
             container client-dispatcher {
                 uses config:service-ref {
@@ -122,9 +143,16 @@ module odl-sal-netconf-connector-cfg {
             }
 
             leaf between-attempts-timeout-millis {
-                description "Timeout in milliseconds to wait between connection attempts.";
+                description "Initial timeout in milliseconds to wait between connection attempts. Will be multiplied by sleep-factor with every additional attempt";
                 type uint16;
-                default 10000;
+                default 2000;
+            }
+
+            leaf sleep-factor {
+                type decimal64 {
+                    fraction-digits 1;
+                }
+                default 1.5;
             }
         }
     }
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java
new file mode 100644 (file)
index 0000000..5ac32b5
--- /dev/null
@@ -0,0 +1,201 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connect.api.MessageTransformer;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory;
+import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+
+public class NetconfDeviceTest {
+
+    private static final NetconfMessage netconfMessage;
+    private static final CompositeNode compositeNode;
+
+    static {
+        try {
+            netconfMessage = mockClass(NetconfMessage.class);
+            compositeNode = mockClass(CompositeNode.class);
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static final  RpcResult<NetconfMessage> rpcResult = Rpcs.getRpcResult(true, netconfMessage, Collections.<RpcError>emptySet());
+    private static final  RpcResult<CompositeNode> rpcResultC = Rpcs.getRpcResult(true, compositeNode, Collections.<RpcError>emptySet());
+
+    public static final String TEST_NAMESPACE = "test:namespace";
+    public static final String TEST_MODULE = "test-module";
+    public static final String TEST_REVISION = "2013-07-22";
+
+    @Test
+    public void testNetconfDeviceWithoutMonitoring() throws Exception {
+        final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+        final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+
+        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), getMessageTransformer(), getSchemaContextProviderFactory(), getSourceProviderFactory());
+        device.onRemoteSessionUp(getSessionCaps(false, Collections.<String>emptyList()), listener);
+
+        Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected();
+    }
+
+    @Test
+    public void testNetconfDeviceReconnect() throws Exception {
+        final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+        final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
+
+        final SchemaContextProviderFactory schemaContextProviderFactory = getSchemaContextProviderFactory();
+        final SchemaSourceProviderFactory<InputStream> sourceProviderFactory = getSourceProviderFactory();
+        final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
+
+        final NetconfDevice device = new NetconfDevice(getId(), facade, getExecutor(), messageTransformer, schemaContextProviderFactory, sourceProviderFactory);
+        final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+                Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&amp;revision=" + TEST_REVISION));
+        device.onRemoteSessionUp(sessionCaps, listener);
+
+        verify(sourceProviderFactory, timeout(5000)).createSourceProvider(any(RpcImplementation.class));
+        verify(schemaContextProviderFactory, timeout(5000)).createContextProvider(any(Collection.class), any(SchemaSourceProvider.class));
+        verify(messageTransformer, timeout(5000)).onGlobalContextUpdated(any(SchemaContext.class));
+        verify(facade, timeout(5000)).onDeviceConnected(any(SchemaContextProvider.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+
+        device.onRemoteSessionDown();
+        verify(facade, timeout(5000)).onDeviceDisconnected();
+
+        device.onRemoteSessionUp(sessionCaps, listener);
+
+        verify(sourceProviderFactory, timeout(5000).times(2)).createSourceProvider(any(RpcImplementation.class));
+        verify(schemaContextProviderFactory, timeout(5000).times(2)).createContextProvider(any(Collection.class), any(SchemaSourceProvider.class));
+        verify(messageTransformer, timeout(5000).times(2)).onGlobalContextUpdated(any(SchemaContext.class));
+        verify(facade, timeout(5000).times(2)).onDeviceConnected(any(SchemaContextProvider.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+    }
+
+    private SchemaContextProviderFactory getSchemaContextProviderFactory() {
+        final SchemaContextProviderFactory schemaContextProviderFactory = mockClass(SchemaContextProviderFactory.class);
+        doReturn(new SchemaContextProvider() {
+            @Override
+            public SchemaContext getSchemaContext() {
+                return getSchema();
+            }
+        }).when(schemaContextProviderFactory).createContextProvider(any(Collection.class), any(SchemaSourceProvider.class));
+        return schemaContextProviderFactory;
+    }
+
+    public static SchemaContext getSchema() {
+        final YangParserImpl parser = new YangParserImpl();
+        final List<InputStream> modelsToParse = Lists.newArrayList(
+                NetconfDeviceTest.class.getResourceAsStream("/schemas/test-module.yang")
+        );
+        final Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
+        return parser.resolveSchemaContext(models);
+    }
+
+    private RemoteDeviceHandler<NetconfSessionCapabilities> getFacade() throws Exception {
+        final RemoteDeviceHandler<NetconfSessionCapabilities> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
+        doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContextProvider.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+        doNothing().when(remoteDeviceHandler).onDeviceDisconnected();
+        return remoteDeviceHandler;
+    }
+
+    private <T extends AutoCloseable> T mockCloseableClass(final Class<T> remoteDeviceHandlerClass) throws Exception {
+        final T mock = mockClass(remoteDeviceHandlerClass);
+        doNothing().when(mock).close();
+        return mock;
+    }
+
+    public SchemaSourceProviderFactory<InputStream> getSourceProviderFactory() {
+        final SchemaSourceProviderFactory<InputStream> mock = mockClass(SchemaSourceProviderFactory.class);
+
+        final SchemaSourceProvider<InputStream> schemaSourceProvider = mockClass(SchemaSourceProvider.class);
+        doReturn(Optional.<String>absent()).when(schemaSourceProvider).getSchemaSource(anyString(), any(Optional.class));
+
+        doReturn(schemaSourceProvider).when(mock).createSourceProvider(any(RpcImplementation.class));
+        return mock;
+    }
+
+    private static <T> T mockClass(final Class<T> remoteDeviceHandlerClass) {
+        final T mock = Mockito.mock(remoteDeviceHandlerClass);
+        Mockito.doReturn(remoteDeviceHandlerClass.getSimpleName()).when(mock).toString();
+        return mock;
+    }
+
+    public RemoteDeviceId getId() {
+        return new RemoteDeviceId("test-D");
+    }
+
+    public ExecutorService getExecutor() {
+        return Executors.newSingleThreadExecutor();
+    }
+
+    public MessageTransformer<NetconfMessage> getMessageTransformer() throws Exception {
+        final MessageTransformer<NetconfMessage> messageTransformer = mockClass(MessageTransformer.class);
+        doReturn(netconfMessage).when(messageTransformer).toRpcRequest(any(QName.class), any(CompositeNode.class));
+        doReturn(rpcResultC).when(messageTransformer).toRpcResult(any(NetconfMessage.class), any(QName.class));
+        doNothing().when(messageTransformer).onGlobalContextUpdated(any(SchemaContext.class));
+        return messageTransformer;
+    }
+
+    public NetconfSessionCapabilities getSessionCaps(final boolean addMonitor, final Collection<String> additionalCapabilities) {
+        final ArrayList<String> capabilities = Lists.newArrayList(
+                XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+                XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1);
+
+        if(addMonitor) {
+            capabilities.add(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING.getNamespace().toString());
+        }
+
+        capabilities.addAll(additionalCapabilities);
+
+        return NetconfSessionCapabilities.fromStrings(
+                capabilities);
+    }
+
+    public RemoteDeviceCommunicator<NetconfMessage> getListener() throws Exception {
+        final RemoteDeviceCommunicator<NetconfMessage> remoteDeviceCommunicator = mockCloseableClass(RemoteDeviceCommunicator.class);
+        doReturn(Futures.immediateFuture(rpcResult)).when(remoteDeviceCommunicator).sendRequest(any(NetconfMessage.class), any(QName.class));
+        return remoteDeviceCommunicator;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/resources/schemas/test-module.yang b/opendaylight/md-sal/sal-netconf-connector/src/test/resources/schemas/test-module.yang
new file mode 100644 (file)
index 0000000..cd732fc
--- /dev/null
@@ -0,0 +1,18 @@
+module test-module {
+    yang-version 1;
+    namespace "test:namespace";
+    prefix "tt";
+
+    description
+        "Types for testing";
+
+    revision "2013-07-22";
+
+
+    container c {
+        leaf a {
+            type string;
+        }
+    }
+
+}