BUG 8839: Revert "Make netconf utilize encrypted passwords only"
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / AbstractNetconfTopology.java
index 7a0d0f7dbb9527e060e304e0739b13e61d1440d4..ad4bfa9029425205ccab65efdf3bc4600395a493 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.netconf.topology;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -18,8 +19,8 @@ import io.netty.util.concurrent.EventExecutor;
 import java.io.File;
 import java.math.BigDecimal;
 import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Collections;
+import java.net.URL;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,12 +28,7 @@ import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.controller.config.threadpool.ThreadPool;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
@@ -40,29 +36,39 @@ import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurati
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
+import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.LibraryModulesSchemas;
 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceBuilder;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemasResolverImpl;
+import org.opendaylight.netconf.sal.connect.netconf.SchemalessNetconfDevice;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.listener.UserPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.netconf.schema.YangLibrarySchemaYangSourceProvider;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.netconf.topology.api.NetconfTopology;
+import org.opendaylight.netconf.topology.api.SchemaRepositoryProvider;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapability.CapabilityOrigin;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
@@ -70,13 +76,14 @@ import org.opendaylight.yangtools.yang.parser.util.TextToASTTransformer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractNetconfTopology implements NetconfTopology, BindingAwareProvider, Provider {
+public abstract class AbstractNetconfTopology implements NetconfTopology {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
 
     protected static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
     protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
     protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+    protected static final int DEFAULT_CONCURRENT_RPC_LIMIT = 0;
     private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
     private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
     private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L;
@@ -89,17 +96,18 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     private static final String CACHE_DIRECTORY = "cache";
 
     /**
-     * The default cache directory relative to <code>CACHE_DIRECTORY</code>
+     * The default cache directory relative to <code>CACHE_DIRECTORY</code>.
      */
     private static final String DEFAULT_CACHE_DIRECTORY = "schema";
 
     /**
-     * The qualified schema cache directory <code>cache/schema</code>
+     * The qualified schema cache directory <code>cache/schema</code>.
      */
-    private static final String QUALIFIED_DEFAULT_CACHE_DIRECTORY = CACHE_DIRECTORY + File.separator+ DEFAULT_CACHE_DIRECTORY;
+    private static final String QUALIFIED_DEFAULT_CACHE_DIRECTORY =
+            CACHE_DIRECTORY + File.separator + DEFAULT_CACHE_DIRECTORY;
 
     /**
-     * The name for the default schema repository
+     * The name for the default schema repository.
      */
     private static final String DEFAULT_SCHEMA_REPOSITORY_NAME = "sal-netconf-connector";
 
@@ -127,18 +135,18 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
      * of the schema cache directory, and the value is a corresponding <code>SchemaResourcesDTO</code>.  The
      * <code>SchemaResourcesDTO</code> is essentially a container that allows for the extraction of the
      * <code>SchemaRegistry</code> and <code>SchemaContextFactory</code> which should be used for a particular
-     * Netconf mount.  Access to <code>schemaResourcesDTOs</code> should be surrounded by appropriate
+     * Netconf mount.  Access to <code>SCHEMA_RESOURCES_DTO_MAP</code> should be surrounded by appropriate
      * synchronization locks.
      */
-    private static volatile Map<String, NetconfDevice.SchemaResourcesDTO> schemaResourcesDTOs = new HashMap<>();
+    private static final Map<String, NetconfDevice.SchemaResourcesDTO> SCHEMA_RESOURCES_DTO_MAP = new HashMap<>();
 
     // Initializes default constant instances for the case when the default schema repository
     // directory cache/schema is used.
     static {
-        schemaResourcesDTOs.put(DEFAULT_CACHE_DIRECTORY,
-                new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY,
+        SCHEMA_RESOURCES_DTO_MAP.put(DEFAULT_CACHE_DIRECTORY,
+                new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY,
                         DEFAULT_SCHEMA_CONTEXT_FACTORY,
-                        new NetconfStateSchemas.NetconfStateSchemasResolverImpl()));
+                        new NetconfStateSchemasResolverImpl()));
         DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(DEFAULT_CACHE);
         DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(
                 TextToASTTransformer.create(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY));
@@ -146,37 +154,32 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
 
     protected final String topologyId;
     private final NetconfClientDispatcher clientDispatcher;
-    protected final BindingAwareBroker bindingAwareBroker;
-    protected final Broker domBroker;
     private final EventExecutor eventExecutor;
     protected final ScheduledThreadPool keepaliveExecutor;
     protected final ThreadPool processingExecutor;
     protected final SharedSchemaRepository sharedSchemaRepository;
+    protected final DataBroker dataBroker;
+    protected final DOMMountPointService mountPointService;
 
     protected SchemaSourceRegistry schemaRegistry = DEFAULT_SCHEMA_REPOSITORY;
+    protected SchemaRepository schemaRepository = DEFAULT_SCHEMA_REPOSITORY;
     protected SchemaContextFactory schemaContextFactory = DEFAULT_SCHEMA_CONTEXT_FACTORY;
 
-    protected DOMMountPointService mountPointService = null;
-    protected DataBroker dataBroker = null;
     protected final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
 
     protected AbstractNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
-                                      final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
                                       final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
-                                      final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
+                                      final ThreadPool processingExecutor,
+                                      final SchemaRepositoryProvider schemaRepositoryProvider,
+                                      final DataBroker dataBroker, final DOMMountPointService mountPointService) {
         this.topologyId = topologyId;
         this.clientDispatcher = clientDispatcher;
-        this.bindingAwareBroker = bindingAwareBroker;
-        this.domBroker = domBroker;
         this.eventExecutor = eventExecutor;
         this.keepaliveExecutor = keepaliveExecutor;
         this.processingExecutor = processingExecutor;
         this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
-    }
-
-    protected void registerToSal(BindingAwareProvider baProvider, Provider provider) {
-        domBroker.registerProvider(provider);
-        bindingAwareBroker.registerProvider(baProvider);
+        this.dataBroker = dataBroker;
+        this.mountPointService = mountPointService;
     }
 
     public void setSchemaRegistry(final SchemaSourceRegistry schemaRegistry) {
@@ -188,29 +191,17 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     }
 
     @Override
-    public abstract void onSessionInitiated(ProviderContext session);
-
-    @Override
-    public String getTopologyId() {
-        return topologyId;
-    }
-
-    @Override
-    public DataBroker getDataBroker() {
-        return dataBroker;
-    }
-
-    @Override
-    public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
+    public ListenableFuture<NetconfDeviceCapabilities> connectNode(final NodeId nodeId, final Node configNode) {
         LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, configNode);
         return setupConnection(nodeId, configNode);
     }
 
     @Override
-    public ListenableFuture<Void> disconnectNode(NodeId nodeId) {
+    public ListenableFuture<Void> disconnectNode(final NodeId nodeId) {
         LOG.debug("Disconnecting RemoteDevice{{}}", nodeId.getValue());
         if (!activeConnectors.containsKey(nodeId)) {
-            return Futures.immediateFailedFuture(new IllegalStateException("Unable to disconnect device that is not connected"));
+            return Futures.immediateFailedFuture(
+                    new IllegalStateException("Unable to disconnect device that is not connected"));
         }
 
         // retrieve connection, and disconnect it
@@ -231,19 +222,21 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
         final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
         final NetconfClientSessionListener netconfClientSessionListener = deviceCommunicatorDTO.getSessionListener();
-        final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(netconfClientSessionListener, netconfNode);
-        final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
+        final NetconfReconnectingClientConfiguration clientConfig =
+                getClientConfig(netconfClientSessionListener, netconfNode);
+        final ListenableFuture<NetconfDeviceCapabilities> future =
+                deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
 
         activeConnectors.put(nodeId, deviceCommunicatorDTO);
 
         Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
             @Override
-            public void onSuccess(NetconfDeviceCapabilities result) {
+            public void onSuccess(final NetconfDeviceCapabilities result) {
                 LOG.debug("Connector for : " + nodeId.getValue() + " started succesfully");
             }
 
             @Override
-            public void onFailure(Throwable t) {
+            public void onFailure(final Throwable throwable) {
                 LOG.error("Connector for : " + nodeId.getValue() + " failed");
                 // remove this node from active connectors?
             }
@@ -255,59 +248,105 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
                                                          final NetconfNode node) {
         //setup default values since default value is not supported in mdsal
-        final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
-        final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
-        final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
-
-        IpAddress ipAddress = node.getHost().getIpAddress();
-        InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
-                ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
+        final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null
+                ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
+        final Long keepaliveDelay = node.getKeepaliveDelay() == null
+                ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
+        final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null
+                ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
+
+        final IpAddress ipAddress = node.getHost().getIpAddress();
+        final InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null
+                ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
                 node.getPort().getValue());
-        RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
+        final RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
 
         RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
-                createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker);
+                createSalFacade(remoteDeviceId);
 
         if (keepaliveDelay > 0) {
             LOG.warn("Adding keepalive facade, for device {}", nodeId);
-            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
+            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(),
+                    keepaliveDelay, defaultRequestTimeoutMillis);
+        }
+
+        // pre register yang library sources as fallback schemas to schema registry
+        final List<SchemaSourceRegistration<YangTextSchemaSource>> registeredYangLibSources = Lists.newArrayList();
+        if (node.getYangLibrary() != null) {
+            final String yangLibURL = node.getYangLibrary().getYangLibraryUrl().getValue();
+            final String yangLibUsername = node.getYangLibrary().getUsername();
+            final String yangLigPassword = node.getYangLibrary().getPassword();
+
+            final LibraryModulesSchemas libraryModulesSchemas;
+            if (yangLibURL != null) {
+                if (yangLibUsername != null && yangLigPassword != null) {
+                    libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword);
+                } else {
+                    libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL);
+                }
+
+                for (final Map.Entry<SourceIdentifier, URL> sourceIdentifierURLEntry
+                        : libraryModulesSchemas.getAvailableModels().entrySet()) {
+                    registeredYangLibSources
+                        .add(schemaRegistry.registerSchemaSource(
+                                new YangLibrarySchemaYangSourceProvider(remoteDeviceId,
+                                        libraryModulesSchemas.getAvailableModels()),
+                                PotentialSchemaSource.create(sourceIdentifierURLEntry.getKey(),
+                                        YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+                }
+            }
         }
 
         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
+        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> device;
+        if (node.isSchemaless()) {
+            device = new SchemalessNetconfDevice(remoteDeviceId, salFacade);
+        } else {
+            device = new NetconfDeviceBuilder()
+                    .setReconnectOnSchemasChange(reconnectOnChangedSchema)
+                    .setSchemaResourcesDTO(schemaResourcesDTO)
+                    .setGlobalProcessingExecutor(processingExecutor.getExecutor())
+                    .setId(remoteDeviceId)
+                    .setSalFacade(salFacade)
+                    .build();
+        }
 
-        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
-                processingExecutor.getExecutor(), reconnectOnChangedSchema);
+        final Optional<UserPreferences> userCapabilities = getUserCapabilities(node);
+        final int rpcMessageLimit =
+                node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
 
-        final Optional<NetconfSessionPreferences> userCapabilities = getUserCapabilities(node);
+        if (rpcMessageLimit < 1) {
+            LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
+        }
 
-        return new NetconfConnectorDTO(
-                userCapabilities.isPresent() ?
-                        new NetconfDeviceCommunicator(
-                                remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride())):
-                        new NetconfDeviceCommunicator(remoteDeviceId, device)
-                , salFacade);
+        return new NetconfConnectorDTO(userCapabilities.isPresent()
+                ? new NetconfDeviceCommunicator(remoteDeviceId, device, userCapabilities.get(), rpcMessageLimit)
+                : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
     }
 
     protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
         // Setup information related to the SchemaRegistry, SchemaResourceFactory, etc.
         NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = null;
         final String moduleSchemaCacheDirectory = node.getSchemaCacheDirectory();
-        // Only checks to ensure the String is not empty or null;  further checks related to directory accessibility and file permissions
-        // are handled during the FilesystemSchemaSourceCache initialization.
+        // Only checks to ensure the String is not empty or null; further checks related to directory
+        // accessibility and file permissionsare handled during the FilesystemSchemaSourceCache initialization.
         if (!Strings.isNullOrEmpty(moduleSchemaCacheDirectory)) {
-            // If a custom schema cache directory is specified, create the backing DTO; otherwise, the SchemaRegistry and
-            // SchemaContextFactory remain the default values.
+            // If a custom schema cache directory is specified, create the backing DTO; otherwise,
+            // the SchemaRegistry and SchemaContextFactory remain the default values.
             if (!moduleSchemaCacheDirectory.equals(DEFAULT_CACHE_DIRECTORY)) {
-                // Multiple modules may be created at once;  synchronize to avoid issues with data consistency among threads.
-                synchronized(schemaResourcesDTOs) {
-                    // Look for the cached DTO to reuse SchemaRegistry and SchemaContextFactory variables if they already exist
-                    schemaResourcesDTO = schemaResourcesDTOs.get(moduleSchemaCacheDirectory);
+                // Multiple modules may be created at once;
+                // synchronize to avoid issues with data consistency among threads.
+                synchronized (SCHEMA_RESOURCES_DTO_MAP) {
+                    // Look for the cached DTO to reuse SchemaRegistry and SchemaContextFactory variables
+                    // if they already exist
+                    schemaResourcesDTO = SCHEMA_RESOURCES_DTO_MAP.get(moduleSchemaCacheDirectory);
                     if (schemaResourcesDTO == null) {
                         schemaResourcesDTO = createSchemaResourcesDTO(moduleSchemaCacheDirectory);
                         schemaResourcesDTO.getSchemaRegistry().registerSchemaSourceListener(
-                                TextToASTTransformer.create((SchemaRepository) schemaResourcesDTO.getSchemaRegistry(), schemaResourcesDTO.getSchemaRegistry())
+                                TextToASTTransformer.create((SchemaRepository) schemaResourcesDTO.getSchemaRegistry(),
+                                        schemaResourcesDTO.getSchemaRegistry())
                         );
-                        schemaResourcesDTOs.put(moduleSchemaCacheDirectory, schemaResourcesDTO);
+                        SCHEMA_RESOURCES_DTO_MAP.put(moduleSchemaCacheDirectory, schemaResourcesDTO);
                     }
                 }
                 LOG.info("Netconf connector for device {} will use schema cache directory {} instead of {}",
@@ -319,8 +358,8 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         }
 
         if (schemaResourcesDTO == null) {
-            schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory,
-                    new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+            schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaRepository,
+                    schemaContextFactory, new NetconfStateSchemasResolverImpl());
         }
 
         return schemaResourcesDTO;
@@ -341,8 +380,8 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         final FilesystemSchemaSourceCache<YangTextSchemaSource> deviceCache =
                 createDeviceFilesystemCache(moduleSchemaCacheDirectory);
         repository.registerSchemaSourceListener(deviceCache);
-        return new NetconfDevice.SchemaResourcesDTO(repository, schemaContextFactory,
-                new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+        return new NetconfDevice.SchemaResourcesDTO(repository, repository, schemaContextFactory,
+                new NetconfStateSchemasResolverImpl());
     }
 
     /**
@@ -351,17 +390,23 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
      * @param schemaCacheDirectory The custom cache directory relative to "cache"
      * @return A <code>FilesystemSchemaSourceCache</code> for the custom schema cache directory
      */
-    private FilesystemSchemaSourceCache<YangTextSchemaSource> createDeviceFilesystemCache(final String schemaCacheDirectory) {
+    private FilesystemSchemaSourceCache<YangTextSchemaSource> createDeviceFilesystemCache(
+            final String schemaCacheDirectory) {
         final String relativeSchemaCacheDirectory = CACHE_DIRECTORY + File.separator + schemaCacheDirectory;
-        return new FilesystemSchemaSourceCache<>(schemaRegistry, YangTextSchemaSource.class, new File(relativeSchemaCacheDirectory));
+        return new FilesystemSchemaSourceCache<>(schemaRegistry, YangTextSchemaSource.class,
+                new File(relativeSchemaCacheDirectory));
     }
 
-    public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, NetconfNode node) {
+    public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener,
+                                                                  final NetconfNode node) {
 
         //setup default values since default value is not supported in mdsal
-        final long clientConnectionTimeoutMillis = node.getConnectionTimeoutMillis() == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : node.getConnectionTimeoutMillis();
-        final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
-        final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
+        final long clientConnectionTimeoutMillis = node.getConnectionTimeoutMillis() == null
+                ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : node.getConnectionTimeoutMillis();
+        final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null
+                ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
+        final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null
+                ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
         final BigDecimal sleepFactor = node.getSleepFactor() == null ? DEFAULT_SLEEP_FACTOR : node.getSleepFactor();
 
         final InetSocketAddress socketAddress = getSocketAddress(node.getHost(), node.getPort().getValue());
@@ -372,10 +417,13 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
 
         final AuthenticationHandler authHandler;
         final Credentials credentials = node.getCredentials();
-        if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) {
+        if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114
+                .netconf.node.credentials.credentials.LoginPassword) {
             authHandler = new LoginPassword(
-                    ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getUsername(),
-                    ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getPassword());
+                    ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114
+                            .netconf.node.credentials.credentials.LoginPassword) credentials).getUsername(),
+                    ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114
+                            .netconf.node.credentials.credentials.LoginPassword) credentials).getPassword());
         } else {
             throw new IllegalStateException("Only login/password authentification is supported");
         }
@@ -385,55 +433,55 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
                 .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
                 .withReconnectStrategy(strategy)
                 .withAuthHandler(authHandler)
-                .withProtocol(node.isTcpOnly() ?
-                        NetconfClientConfiguration.NetconfClientProtocol.TCP :
+                .withProtocol(node.isTcpOnly() ? NetconfClientConfiguration.NetconfClientProtocol.TCP :
                         NetconfClientConfiguration.NetconfClientProtocol.SSH)
                 .withConnectStrategyFactory(sf)
                 .withSessionListener(listener)
                 .build();
     }
 
-    protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker);
-
-    @Override
-    public abstract ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
-
-    @Override
-    public void onSessionInitiated(ProviderSession session) {
-         mountPointService = session.getService(DOMMountPointService.class);
-    }
-
-    @Override
-    public Collection<ProviderFunctionality> getProviderFunctionality() {
-        return Collections.emptySet();
-    }
+    protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(RemoteDeviceId id);
 
-    private InetSocketAddress getSocketAddress(final Host host, int port) {
-        if(host.getDomainName() != null) {
+    private InetSocketAddress getSocketAddress(final Host host, final int port) {
+        if (host.getDomainName() != null) {
             return new InetSocketAddress(host.getDomainName().getValue(), port);
         } else {
             final IpAddress ipAddress = host.getIpAddress();
-            final String ip = ipAddress.getIpv4Address() != null ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue();
+            final String ip = ipAddress.getIpv4Address() != null
+                    ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue();
             return new InetSocketAddress(ip, port);
         }
     }
 
-    private Optional<NetconfSessionPreferences> getUserCapabilities(final NetconfNode node) {
-        if(node.getYangModuleCapabilities() == null) {
+    private Optional<UserPreferences> getUserCapabilities(final NetconfNode node) {
+        // if none of yang-module-capabilities or non-module-capabilities is specified
+        // just return absent
+        if (node.getYangModuleCapabilities() == null && node.getNonModuleCapabilities() == null) {
             return Optional.absent();
         }
 
-        final List<String> capabilities = node.getYangModuleCapabilities().getCapability();
-        if(capabilities == null || capabilities.isEmpty()) {
-            return Optional.absent();
+        final List<String> capabilities = new ArrayList<>();
+
+        boolean overrideYangModuleCaps = false;
+        if (node.getYangModuleCapabilities() != null) {
+            capabilities.addAll(node.getYangModuleCapabilities().getCapability());
+            overrideYangModuleCaps = node.getYangModuleCapabilities().isOverride();
         }
 
-        final NetconfSessionPreferences parsedOverrideCapabilities = NetconfSessionPreferences.fromStrings(capabilities);
-        Preconditions.checkState(parsedOverrideCapabilities.getNonModuleCaps().isEmpty(), "Capabilities to override can " +
-                "only contain module based capabilities, non-module capabilities will be retrieved from the device," +
-                " configured non-module capabilities: " + parsedOverrideCapabilities.getNonModuleCaps());
+        //non-module capabilities should not exist in yang module capabilities
+        final NetconfSessionPreferences netconfSessionPreferences = NetconfSessionPreferences.fromStrings(capabilities);
+        Preconditions.checkState(netconfSessionPreferences.getNonModuleCaps().isEmpty(),
+                "List yang-module-capabilities/capability should contain only module based capabilities. "
+                        + "Non-module capabilities used: " + netconfSessionPreferences.getNonModuleCaps());
+
+        boolean overrideNonModuleCaps = false;
+        if (node.getNonModuleCapabilities() != null) {
+            capabilities.addAll(node.getNonModuleCapabilities().getCapability());
+            overrideNonModuleCaps = node.getNonModuleCapabilities().isOverride();
+        }
 
-        return Optional.of(parsedOverrideCapabilities);
+        return Optional.of(new UserPreferences(NetconfSessionPreferences
+            .fromStrings(capabilities, CapabilityOrigin.UserDefined), overrideYangModuleCaps, overrideNonModuleCaps));
     }
 
     private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
@@ -442,7 +490,8 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         private final double sleepFactor;
         private final int minSleep;
 
-        TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
+        TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts,
+                                      final int minSleep, final BigDecimal sleepFactor) {
             if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
                 connectionAttempts = maxConnectionAttempts;
             } else {
@@ -464,12 +513,13 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         }
     }
 
-    protected static class NetconfConnectorDTO {
+    protected static class NetconfConnectorDTO implements AutoCloseable {
 
         private final NetconfDeviceCommunicator communicator;
         private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
 
-        public NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
+        public NetconfConnectorDTO(final NetconfDeviceCommunicator communicator,
+                                   final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
             this.communicator = communicator;
             this.facade = facade;
         }
@@ -485,6 +535,11 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         public NetconfClientSessionListener getSessionListener() {
             return communicator;
         }
-    }
 
+        @Override
+        public void close() {
+            communicator.close();
+            facade.close();
+        }
+    }
 }