Bug 6198 - Use sal-netconf-connector to connet device costs too much time
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / AbstractNetconfTopology.java
index 488b680ab53f048959fa82bed67e5967295cf04c..f0c56f3ca79301defcca62328ba63378083dbf84 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.netconf.topology;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -18,6 +19,7 @@ import io.netty.util.concurrent.EventExecutor;
 import java.io.File;
 import java.math.BigDecimal;
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,6 +35,7 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
@@ -40,37 +43,37 @@ import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurati
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
+import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.LibraryModulesSchemas;
 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceBuilder;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemasResolverImpl;
+import org.opendaylight.netconf.sal.connect.netconf.SchemalessNetconfDevice;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.listener.UserPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.netconf.schema.YangLibrarySchemaYangSourceProvider;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yangtools.yang.binding.Identifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
@@ -85,6 +88,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     protected static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
     protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
     protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+    protected static final int DEFAULT_CONCURRENT_RPC_LIMIT = 0;
     private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
     private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
     private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L;
@@ -144,9 +148,9 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     // directory cache/schema is used.
     static {
         schemaResourcesDTOs.put(DEFAULT_CACHE_DIRECTORY,
-                new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY,
+                new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY,
                         DEFAULT_SCHEMA_CONTEXT_FACTORY,
-                        new NetconfStateSchemas.NetconfStateSchemasResolverImpl()));
+                        new NetconfStateSchemasResolverImpl()));
         DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(DEFAULT_CACHE);
         DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(
                 TextToASTTransformer.create(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY));
@@ -162,6 +166,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     protected final SharedSchemaRepository sharedSchemaRepository;
 
     protected SchemaSourceRegistry schemaRegistry = DEFAULT_SCHEMA_REPOSITORY;
+    protected SchemaRepository schemaRepository = DEFAULT_SCHEMA_REPOSITORY;
     protected SchemaContextFactory schemaContextFactory = DEFAULT_SCHEMA_CONTEXT_FACTORY;
 
     protected DOMMountPointService mountPointService = null;
@@ -262,8 +267,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
 
     protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
                                                          final NetconfNode node) {
-        //setup default values since default value is not supported yet in mdsal
-        // TODO remove this when mdsal starts supporting default values
+        //setup default values since default value is not supported in mdsal
         final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
         final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
         final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
@@ -275,26 +279,66 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
 
         RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
-                createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+                createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker);
 
         if (keepaliveDelay > 0) {
             LOG.warn("Adding keepalive facade, for device {}", nodeId);
-            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
+            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
         }
 
-        final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
+        // pre register yang library sources as fallback schemas to schema registry
+        List<SchemaSourceRegistration<YangTextSchemaSource>> registeredYangLibSources = Lists.newArrayList();
+        if (node.getYangLibrary() != null) {
+            final String yangLibURL = node.getYangLibrary().getYangLibraryUrl().getValue();
+            final String yangLibUsername = node.getYangLibrary().getUsername();
+            final String yangLigPassword = node.getYangLibrary().getPassword();
+
+            LibraryModulesSchemas libraryModulesSchemas;
+            if(yangLibURL != null) {
+                if(yangLibUsername != null && yangLigPassword != null) {
+                    libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword);
+                } else {
+                    libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL);
+                }
 
-        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
-                processingExecutor.getExecutor(), reconnectOnChangedSchema);
+                for (Map.Entry<SourceIdentifier, URL> sourceIdentifierURLEntry : libraryModulesSchemas.getAvailableModels().entrySet()) {
+                    registeredYangLibSources.
+                            add(schemaRegistry.registerSchemaSource(
+                                    new YangLibrarySchemaYangSourceProvider(remoteDeviceId, libraryModulesSchemas.getAvailableModels()),
+                                    PotentialSchemaSource
+                                            .create(sourceIdentifierURLEntry.getKey(), YangTextSchemaSource.class,
+                                            PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+                }
+            }
+        }
+
+        final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
+        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> device;
+        if (node.isSchemaless()) {
+            device = new SchemalessNetconfDevice(remoteDeviceId, salFacade);
+        } else {
+            device = new NetconfDeviceBuilder()
+                    .setReconnectOnSchemasChange(reconnectOnChangedSchema)
+                    .setSchemaResourcesDTO(schemaResourcesDTO)
+                    .setGlobalProcessingExecutor(processingExecutor.getExecutor())
+                    .setId(remoteDeviceId)
+                    .setSalFacade(salFacade)
+                    .build();
+        }
 
         final Optional<NetconfSessionPreferences> userCapabilities = getUserCapabilities(node);
+        final int rpcMessageLimit =
+                node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
+
+        if (rpcMessageLimit < 1) {
+            LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
+        }
 
         return new NetconfConnectorDTO(
                 userCapabilities.isPresent() ?
                         new NetconfDeviceCommunicator(
-                                remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride())):
-                        new NetconfDeviceCommunicator(remoteDeviceId, device)
-                , salFacade);
+                                remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride()), rpcMessageLimit):
+                        new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
     }
 
     protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
@@ -302,7 +346,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = null;
         final String moduleSchemaCacheDirectory = node.getSchemaCacheDirectory();
         // Only checks to ensure the String is not empty or null;  further checks related to directory accessibility and file permissions
-        // are handled during the FilesystemScehamSourceCache initialization.
+        // are handled during the FilesystemSchemaSourceCache initialization.
         if (!Strings.isNullOrEmpty(moduleSchemaCacheDirectory)) {
             // If a custom schema cache directory is specified, create the backing DTO; otherwise, the SchemaRegistry and
             // SchemaContextFactory remain the default values.
@@ -312,7 +356,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
                     // Look for the cached DTO to reuse SchemaRegistry and SchemaContextFactory variables if they already exist
                     schemaResourcesDTO = schemaResourcesDTOs.get(moduleSchemaCacheDirectory);
                     if (schemaResourcesDTO == null) {
-                        schemaResourcesDTO = createSchemaResourcesDTO(moduleSchemaCacheDirectory, nodeId.getValue());
+                        schemaResourcesDTO = createSchemaResourcesDTO(moduleSchemaCacheDirectory);
                         schemaResourcesDTO.getSchemaRegistry().registerSchemaSourceListener(
                                 TextToASTTransformer.create((SchemaRepository) schemaResourcesDTO.getSchemaRegistry(), schemaResourcesDTO.getSchemaRegistry())
                         );
@@ -328,8 +372,8 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         }
 
         if (schemaResourcesDTO == null) {
-            schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory,
-                    new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+            schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaRepository, schemaContextFactory,
+                    new NetconfStateSchemasResolverImpl());
         }
 
         return schemaResourcesDTO;
@@ -341,9 +385,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
      * @param moduleSchemaCacheDirectory The string directory relative to "cache"
      * @return A DTO containing the Schema classes for the Netconf mount.
      */
-    private NetconfDevice.SchemaResourcesDTO createSchemaResourcesDTO(final String moduleSchemaCacheDirectory,
-            final String instanceName) {
-
+    private NetconfDevice.SchemaResourcesDTO createSchemaResourcesDTO(final String moduleSchemaCacheDirectory) {
         final SharedSchemaRepository repository = new SharedSchemaRepository(moduleSchemaCacheDirectory);
         final SchemaContextFactory schemaContextFactory
                 = repository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
@@ -352,8 +394,8 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         final FilesystemSchemaSourceCache<YangTextSchemaSource> deviceCache =
                 createDeviceFilesystemCache(moduleSchemaCacheDirectory);
         repository.registerSchemaSourceListener(deviceCache);
-        return new NetconfDevice.SchemaResourcesDTO(repository, schemaContextFactory,
-                new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+        return new NetconfDevice.SchemaResourcesDTO(repository, repository, schemaContextFactory,
+                new NetconfStateSchemasResolverImpl());
     }
 
     /**
@@ -369,8 +411,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
 
     public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, NetconfNode node) {
 
-        //setup default values since default value is not supported yet in mdsal
-        // TODO remove this when mdsal starts supporting default values
+        //setup default values since default value is not supported in mdsal
         final long clientConnectionTimeoutMillis = node.getConnectionTimeoutMillis() == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : node.getConnectionTimeoutMillis();
         final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
         final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
@@ -405,14 +446,14 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
                 .build();
     }
 
-    protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis);
+    protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker);
 
     @Override
     public abstract ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
 
     @Override
     public void onSessionInitiated(ProviderSession session) {
-        mountPointService = session.getService(DOMMountPointService.class);
+         mountPointService = session.getService(DOMMountPointService.class);
     }
 
     @Override
@@ -420,30 +461,6 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         return Collections.emptySet();
     }
 
-    //TODO this needs to be an util method, since netconf clustering uses this aswell
-    /**
-     * Determines the Netconf Node Node ID, given the node's instance
-     * identifier.
-     *
-     * @param pathArgument Node's path arument
-     * @return     NodeId for the node
-     */
-    protected NodeId getNodeId(final PathArgument pathArgument) {
-        if (pathArgument instanceof InstanceIdentifier.IdentifiableItem<?, ?>) {
-
-            final Identifier key = ((InstanceIdentifier.IdentifiableItem) pathArgument).getKey();
-            if(key instanceof NodeKey) {
-                return ((NodeKey) key).getNodeId();
-            }
-        }
-        throw new IllegalStateException("Unable to create NodeId from: " + pathArgument);
-    }
-
-    protected static InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
-        final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
-        return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
-    }
-
     private InetSocketAddress getSocketAddress(final Host host, int port) {
         if(host.getDomainName() != null) {
             return new InetSocketAddress(host.getDomainName().getValue(), port);