Merge "Add unit tests for sal-netconf-connector transactions"
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / AbstractNetconfTopology.java
index 777adcf405f6fe8fd73ff2f1fd0f7d86e3364175..7174c53be544c628a4d85eb510c870d7a18ea751 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.netconf.topology;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -18,6 +19,7 @@ import io.netty.util.concurrent.EventExecutor;
 import java.io.File;
 import java.math.BigDecimal;
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,6 +35,7 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
@@ -40,29 +43,37 @@ import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurati
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
+import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.LibraryModulesSchemas;
 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceBuilder;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemasResolverImpl;
+import org.opendaylight.netconf.sal.connect.netconf.SchemalessNetconfDevice;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.listener.UserPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.netconf.schema.YangLibrarySchemaYangSourceProvider;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapability.CapabilityOrigin;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
@@ -77,6 +88,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     protected static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
     protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
     protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+    protected static final int DEFAULT_CONCURRENT_RPC_LIMIT = 0;
     private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
     private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
     private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L;
@@ -136,9 +148,9 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     // directory cache/schema is used.
     static {
         schemaResourcesDTOs.put(DEFAULT_CACHE_DIRECTORY,
-                new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY,
+                new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY,
                         DEFAULT_SCHEMA_CONTEXT_FACTORY,
-                        new NetconfStateSchemas.NetconfStateSchemasResolverImpl()));
+                        new NetconfStateSchemasResolverImpl()));
         DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(DEFAULT_CACHE);
         DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(
                 TextToASTTransformer.create(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY));
@@ -154,6 +166,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     protected final SharedSchemaRepository sharedSchemaRepository;
 
     protected SchemaSourceRegistry schemaRegistry = DEFAULT_SCHEMA_REPOSITORY;
+    protected SchemaRepository schemaRepository = DEFAULT_SCHEMA_REPOSITORY;
     protected SchemaContextFactory schemaContextFactory = DEFAULT_SCHEMA_CONTEXT_FACTORY;
 
     protected DOMMountPointService mountPointService = null;
@@ -190,16 +203,6 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     @Override
     public abstract void onSessionInitiated(ProviderContext session);
 
-    @Override
-    public String getTopologyId() {
-        return topologyId;
-    }
-
-    @Override
-    public DataBroker getDataBroker() {
-        return dataBroker;
-    }
-
     @Override
     public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
         LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, configNode);
@@ -254,8 +257,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
 
     protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
                                                          final NetconfNode node) {
-        //setup default values since default value is not supported yet in mdsal
-        // TODO remove this when mdsal starts supporting default values
+        //setup default values since default value is not supported in mdsal
         final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
         final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
         final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
@@ -267,26 +269,66 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
 
         RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
-                createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+                createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker);
 
         if (keepaliveDelay > 0) {
             LOG.warn("Adding keepalive facade, for device {}", nodeId);
-            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
+            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
         }
 
-        final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
+        // pre register yang library sources as fallback schemas to schema registry
+        List<SchemaSourceRegistration<YangTextSchemaSource>> registeredYangLibSources = Lists.newArrayList();
+        if (node.getYangLibrary() != null) {
+            final String yangLibURL = node.getYangLibrary().getYangLibraryUrl().getValue();
+            final String yangLibUsername = node.getYangLibrary().getUsername();
+            final String yangLigPassword = node.getYangLibrary().getPassword();
+
+            LibraryModulesSchemas libraryModulesSchemas;
+            if(yangLibURL != null) {
+                if(yangLibUsername != null && yangLigPassword != null) {
+                    libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword);
+                } else {
+                    libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL);
+                }
 
-        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
-                processingExecutor.getExecutor(), reconnectOnChangedSchema);
+                for (Map.Entry<SourceIdentifier, URL> sourceIdentifierURLEntry : libraryModulesSchemas.getAvailableModels().entrySet()) {
+                    registeredYangLibSources.
+                            add(schemaRegistry.registerSchemaSource(
+                                    new YangLibrarySchemaYangSourceProvider(remoteDeviceId, libraryModulesSchemas.getAvailableModels()),
+                                    PotentialSchemaSource
+                                            .create(sourceIdentifierURLEntry.getKey(), YangTextSchemaSource.class,
+                                            PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+                }
+            }
+        }
+
+        final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
+        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> device;
+        if (node.isSchemaless()) {
+            device = new SchemalessNetconfDevice(remoteDeviceId, salFacade);
+        } else {
+            device = new NetconfDeviceBuilder()
+                    .setReconnectOnSchemasChange(reconnectOnChangedSchema)
+                    .setSchemaResourcesDTO(schemaResourcesDTO)
+                    .setGlobalProcessingExecutor(processingExecutor.getExecutor())
+                    .setId(remoteDeviceId)
+                    .setSalFacade(salFacade)
+                    .build();
+        }
 
         final Optional<NetconfSessionPreferences> userCapabilities = getUserCapabilities(node);
+        final int rpcMessageLimit =
+                node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
+
+        if (rpcMessageLimit < 1) {
+            LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
+        }
 
         return new NetconfConnectorDTO(
                 userCapabilities.isPresent() ?
                         new NetconfDeviceCommunicator(
-                                remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride())):
-                        new NetconfDeviceCommunicator(remoteDeviceId, device)
-                , salFacade);
+                                remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride()), rpcMessageLimit):
+                        new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
     }
 
     protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
@@ -320,8 +362,8 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         }
 
         if (schemaResourcesDTO == null) {
-            schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory,
-                    new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+            schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaRepository, schemaContextFactory,
+                    new NetconfStateSchemasResolverImpl());
         }
 
         return schemaResourcesDTO;
@@ -342,8 +384,8 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         final FilesystemSchemaSourceCache<YangTextSchemaSource> deviceCache =
                 createDeviceFilesystemCache(moduleSchemaCacheDirectory);
         repository.registerSchemaSourceListener(deviceCache);
-        return new NetconfDevice.SchemaResourcesDTO(repository, schemaContextFactory,
-                new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+        return new NetconfDevice.SchemaResourcesDTO(repository, repository, schemaContextFactory,
+                new NetconfStateSchemasResolverImpl());
     }
 
     /**
@@ -359,8 +401,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
 
     public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, NetconfNode node) {
 
-        //setup default values since default value is not supported yet in mdsal
-        // TODO remove this when mdsal starts supporting default values
+        //setup default values since default value is not supported in mdsal
         final long clientConnectionTimeoutMillis = node.getConnectionTimeoutMillis() == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : node.getConnectionTimeoutMillis();
         final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
         final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
@@ -395,10 +436,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
                 .build();
     }
 
-    protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis);
-
-    @Override
-    public abstract ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
+    protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker);
 
     @Override
     public void onSessionInitiated(ProviderSession session) {
@@ -430,7 +468,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
             return Optional.absent();
         }
 
-        final NetconfSessionPreferences parsedOverrideCapabilities = NetconfSessionPreferences.fromStrings(capabilities);
+        final NetconfSessionPreferences parsedOverrideCapabilities = NetconfSessionPreferences.fromStrings(capabilities, CapabilityOrigin.UserDefined);
         Preconditions.checkState(parsedOverrideCapabilities.getNonModuleCaps().isEmpty(), "Capabilities to override can " +
                 "only contain module based capabilities, non-module capabilities will be retrieved from the device," +
                 " configured non-module capabilities: " + parsedOverrideCapabilities.getNonModuleCaps());