Close missing resources when netconf-topolgy goes down
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / AbstractNetconfTopology.java
index 7a0d0f7dbb9527e060e304e0739b13e61d1440d4..7ac1014942eaccc477109698ef4628d119ce3cce 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.netconf.topology;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -18,8 +19,7 @@ import io.netty.util.concurrent.EventExecutor;
 import java.io.File;
 import java.math.BigDecimal;
 import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Collections;
+import java.net.URL;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,11 +28,8 @@ import org.opendaylight.controller.config.threadpool.ThreadPool;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
@@ -40,29 +37,39 @@ import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurati
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
+import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.LibraryModulesSchemas;
 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceBuilder;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemasResolverImpl;
+import org.opendaylight.netconf.sal.connect.netconf.SchemalessNetconfDevice;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.listener.UserPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.netconf.schema.YangLibrarySchemaYangSourceProvider;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.netconf.topology.api.NetconfTopology;
+import org.opendaylight.netconf.topology.api.SchemaRepositoryProvider;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapability.CapabilityOrigin;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
@@ -70,13 +77,14 @@ import org.opendaylight.yangtools.yang.parser.util.TextToASTTransformer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractNetconfTopology implements NetconfTopology, BindingAwareProvider, Provider {
+public abstract class AbstractNetconfTopology implements NetconfTopology {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
 
     protected static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
     protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
     protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+    protected static final int DEFAULT_CONCURRENT_RPC_LIMIT = 0;
     private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
     private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
     private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L;
@@ -136,9 +144,9 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     // directory cache/schema is used.
     static {
         schemaResourcesDTOs.put(DEFAULT_CACHE_DIRECTORY,
-                new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY,
+                new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY,
                         DEFAULT_SCHEMA_CONTEXT_FACTORY,
-                        new NetconfStateSchemas.NetconfStateSchemasResolverImpl()));
+                        new NetconfStateSchemasResolverImpl()));
         DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(DEFAULT_CACHE);
         DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(
                 TextToASTTransformer.create(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY));
@@ -152,18 +160,20 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
     protected final ScheduledThreadPool keepaliveExecutor;
     protected final ThreadPool processingExecutor;
     protected final SharedSchemaRepository sharedSchemaRepository;
+    protected final DataBroker dataBroker;
+    protected final DOMMountPointService mountPointService;
 
     protected SchemaSourceRegistry schemaRegistry = DEFAULT_SCHEMA_REPOSITORY;
+    protected SchemaRepository schemaRepository = DEFAULT_SCHEMA_REPOSITORY;
     protected SchemaContextFactory schemaContextFactory = DEFAULT_SCHEMA_CONTEXT_FACTORY;
 
-    protected DOMMountPointService mountPointService = null;
-    protected DataBroker dataBroker = null;
     protected final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
 
     protected AbstractNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
                                       final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
                                       final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
-                                      final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
+                                      final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
+                                      final DataBroker dataBroker, final DOMMountPointService mountPointService) {
         this.topologyId = topologyId;
         this.clientDispatcher = clientDispatcher;
         this.bindingAwareBroker = bindingAwareBroker;
@@ -172,11 +182,8 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         this.keepaliveExecutor = keepaliveExecutor;
         this.processingExecutor = processingExecutor;
         this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
-    }
-
-    protected void registerToSal(BindingAwareProvider baProvider, Provider provider) {
-        domBroker.registerProvider(provider);
-        bindingAwareBroker.registerProvider(baProvider);
+        this.dataBroker = dataBroker;
+        this.mountPointService = mountPointService;
     }
 
     public void setSchemaRegistry(final SchemaSourceRegistry schemaRegistry) {
@@ -187,19 +194,6 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         this.schemaContextFactory = schemaContextFactory;
     }
 
-    @Override
-    public abstract void onSessionInitiated(ProviderContext session);
-
-    @Override
-    public String getTopologyId() {
-        return topologyId;
-    }
-
-    @Override
-    public DataBroker getDataBroker() {
-        return dataBroker;
-    }
-
     @Override
     public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
         LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, configNode);
@@ -273,19 +267,59 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
             salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
         }
 
-        final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
+        // pre register yang library sources as fallback schemas to schema registry
+        List<SchemaSourceRegistration<YangTextSchemaSource>> registeredYangLibSources = Lists.newArrayList();
+        if (node.getYangLibrary() != null) {
+            final String yangLibURL = node.getYangLibrary().getYangLibraryUrl().getValue();
+            final String yangLibUsername = node.getYangLibrary().getUsername();
+            final String yangLigPassword = node.getYangLibrary().getPassword();
+
+            LibraryModulesSchemas libraryModulesSchemas;
+            if(yangLibURL != null) {
+                if(yangLibUsername != null && yangLigPassword != null) {
+                    libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword);
+                } else {
+                    libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL);
+                }
+
+                for (Map.Entry<SourceIdentifier, URL> sourceIdentifierURLEntry : libraryModulesSchemas.getAvailableModels().entrySet()) {
+                    registeredYangLibSources.
+                            add(schemaRegistry.registerSchemaSource(
+                                    new YangLibrarySchemaYangSourceProvider(remoteDeviceId, libraryModulesSchemas.getAvailableModels()),
+                                    PotentialSchemaSource
+                                            .create(sourceIdentifierURLEntry.getKey(), YangTextSchemaSource.class,
+                                            PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+                }
+            }
+        }
 
-        final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
-                processingExecutor.getExecutor(), reconnectOnChangedSchema);
+        final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
+        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> device;
+        if (node.isSchemaless()) {
+            device = new SchemalessNetconfDevice(remoteDeviceId, salFacade);
+        } else {
+            device = new NetconfDeviceBuilder()
+                    .setReconnectOnSchemasChange(reconnectOnChangedSchema)
+                    .setSchemaResourcesDTO(schemaResourcesDTO)
+                    .setGlobalProcessingExecutor(processingExecutor.getExecutor())
+                    .setId(remoteDeviceId)
+                    .setSalFacade(salFacade)
+                    .build();
+        }
 
         final Optional<NetconfSessionPreferences> userCapabilities = getUserCapabilities(node);
+        final int rpcMessageLimit =
+                node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
+
+        if (rpcMessageLimit < 1) {
+            LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
+        }
 
         return new NetconfConnectorDTO(
                 userCapabilities.isPresent() ?
                         new NetconfDeviceCommunicator(
-                                remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride())):
-                        new NetconfDeviceCommunicator(remoteDeviceId, device)
-                , salFacade);
+                                remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride()), rpcMessageLimit):
+                        new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
     }
 
     protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
@@ -319,8 +353,8 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         }
 
         if (schemaResourcesDTO == null) {
-            schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory,
-                    new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+            schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaRepository, schemaContextFactory,
+                    new NetconfStateSchemasResolverImpl());
         }
 
         return schemaResourcesDTO;
@@ -341,8 +375,8 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         final FilesystemSchemaSourceCache<YangTextSchemaSource> deviceCache =
                 createDeviceFilesystemCache(moduleSchemaCacheDirectory);
         repository.registerSchemaSourceListener(deviceCache);
-        return new NetconfDevice.SchemaResourcesDTO(repository, schemaContextFactory,
-                new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+        return new NetconfDevice.SchemaResourcesDTO(repository, repository, schemaContextFactory,
+                new NetconfStateSchemasResolverImpl());
     }
 
     /**
@@ -395,19 +429,6 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
 
     protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker);
 
-    @Override
-    public abstract ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
-
-    @Override
-    public void onSessionInitiated(ProviderSession session) {
-         mountPointService = session.getService(DOMMountPointService.class);
-    }
-
-    @Override
-    public Collection<ProviderFunctionality> getProviderFunctionality() {
-        return Collections.emptySet();
-    }
-
     private InetSocketAddress getSocketAddress(final Host host, int port) {
         if(host.getDomainName() != null) {
             return new InetSocketAddress(host.getDomainName().getValue(), port);
@@ -428,7 +449,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
             return Optional.absent();
         }
 
-        final NetconfSessionPreferences parsedOverrideCapabilities = NetconfSessionPreferences.fromStrings(capabilities);
+        final NetconfSessionPreferences parsedOverrideCapabilities = NetconfSessionPreferences.fromStrings(capabilities, CapabilityOrigin.UserDefined);
         Preconditions.checkState(parsedOverrideCapabilities.getNonModuleCaps().isEmpty(), "Capabilities to override can " +
                 "only contain module based capabilities, non-module capabilities will be retrieved from the device," +
                 " configured non-module capabilities: " + parsedOverrideCapabilities.getNonModuleCaps());
@@ -464,7 +485,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         }
     }
 
-    protected static class NetconfConnectorDTO {
+    protected static class NetconfConnectorDTO implements AutoCloseable {
 
         private final NetconfDeviceCommunicator communicator;
         private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
@@ -485,6 +506,12 @@ public abstract class AbstractNetconfTopology implements NetconfTopology, Bindin
         public NetconfClientSessionListener getSessionListener() {
             return communicator;
         }
+
+        @Override
+        public void close() {
+            communicator.close();
+            facade.close();
+        }
     }
 
 }