import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.File;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Collections;
+import java.net.URL;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
import org.opendaylight.netconf.client.NetconfClientSessionListener;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
+import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.LibraryModulesSchemas;
import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceBuilder;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemasResolverImpl;
+import org.opendaylight.netconf.sal.connect.netconf.SchemalessNetconfDevice;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.sal.connect.netconf.listener.UserPreferences;
import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.netconf.schema.YangLibrarySchemaYangSourceProvider;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.netconf.topology.api.NetconfTopology;
+import org.opendaylight.netconf.topology.api.SchemaRepositoryProvider;
import org.opendaylight.protocol.framework.ReconnectStrategy;
import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapability.CapabilityOrigin;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractNetconfTopology implements NetconfTopology, BindingAwareProvider, Provider {
+public abstract class AbstractNetconfTopology implements NetconfTopology {
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
protected static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+ protected static final int DEFAULT_CONCURRENT_RPC_LIMIT = 0;
private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L;
* Netconf mount. Access to <code>schemaResourcesDTOs</code> should be surrounded by appropriate
* synchronization locks.
*/
- private static volatile Map<String, NetconfDevice.SchemaResourcesDTO> schemaResourcesDTOs = new HashMap<>();
+ private static final Map<String, NetconfDevice.SchemaResourcesDTO> schemaResourcesDTOs = new HashMap<>();
// Initializes default constant instances for the case when the default schema repository
// directory cache/schema is used.
static {
schemaResourcesDTOs.put(DEFAULT_CACHE_DIRECTORY,
- new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY,
+ new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY,
DEFAULT_SCHEMA_CONTEXT_FACTORY,
- new NetconfStateSchemas.NetconfStateSchemasResolverImpl()));
+ new NetconfStateSchemasResolverImpl()));
DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(DEFAULT_CACHE);
DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(
TextToASTTransformer.create(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY));
protected final ScheduledThreadPool keepaliveExecutor;
protected final ThreadPool processingExecutor;
protected final SharedSchemaRepository sharedSchemaRepository;
+ protected final DataBroker dataBroker;
protected SchemaSourceRegistry schemaRegistry = DEFAULT_SCHEMA_REPOSITORY;
+ protected SchemaRepository schemaRepository = DEFAULT_SCHEMA_REPOSITORY;
protected SchemaContextFactory schemaContextFactory = DEFAULT_SCHEMA_CONTEXT_FACTORY;
- protected DOMMountPointService mountPointService = null;
- protected DataBroker dataBroker = null;
protected final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
protected AbstractNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
- final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
+ final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
+ final DataBroker dataBroker) {
this.topologyId = topologyId;
this.clientDispatcher = clientDispatcher;
this.bindingAwareBroker = bindingAwareBroker;
this.keepaliveExecutor = keepaliveExecutor;
this.processingExecutor = processingExecutor;
this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
- }
-
- protected void registerToSal(BindingAwareProvider baProvider, Provider provider) {
- domBroker.registerProvider(provider);
- bindingAwareBroker.registerProvider(baProvider);
+ this.dataBroker = dataBroker;
}
public void setSchemaRegistry(final SchemaSourceRegistry schemaRegistry) {
}
@Override
- public abstract void onSessionInitiated(ProviderContext session);
-
- @Override
- public String getTopologyId() {
- return topologyId;
- }
-
- @Override
- public DataBroker getDataBroker() {
- return dataBroker;
- }
-
- @Override
- public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
+ public ListenableFuture<NetconfDeviceCapabilities> connectNode(final NodeId nodeId, final Node configNode) {
LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, configNode);
return setupConnection(nodeId, configNode);
}
@Override
- public ListenableFuture<Void> disconnectNode(NodeId nodeId) {
+ public ListenableFuture<Void> disconnectNode(final NodeId nodeId) {
LOG.debug("Disconnecting RemoteDevice{{}}", nodeId.getValue());
if (!activeConnectors.containsKey(nodeId)) {
return Futures.immediateFailedFuture(new IllegalStateException("Unable to disconnect device that is not connected"));
Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
@Override
- public void onSuccess(NetconfDeviceCapabilities result) {
+ public void onSuccess(final NetconfDeviceCapabilities result) {
LOG.debug("Connector for : " + nodeId.getValue() + " started succesfully");
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(final Throwable t) {
LOG.error("Connector for : " + nodeId.getValue() + " failed");
// remove this node from active connectors?
}
final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
- IpAddress ipAddress = node.getHost().getIpAddress();
- InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
+ final IpAddress ipAddress = node.getHost().getIpAddress();
+ final InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
node.getPort().getValue());
- RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
+ final RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker);
salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay, defaultRequestTimeoutMillis);
}
+ // pre register yang library sources as fallback schemas to schema registry
+ final List<SchemaSourceRegistration<YangTextSchemaSource>> registeredYangLibSources = Lists.newArrayList();
+ if (node.getYangLibrary() != null) {
+ final String yangLibURL = node.getYangLibrary().getYangLibraryUrl().getValue();
+ final String yangLibUsername = node.getYangLibrary().getUsername();
+ final String yangLigPassword = node.getYangLibrary().getPassword();
+
+ final LibraryModulesSchemas libraryModulesSchemas;
+ if(yangLibURL != null) {
+ if(yangLibUsername != null && yangLigPassword != null) {
+ libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword);
+ } else {
+ libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL);
+ }
+
+ for (final Map.Entry<SourceIdentifier, URL> sourceIdentifierURLEntry : libraryModulesSchemas.getAvailableModels().entrySet()) {
+ registeredYangLibSources.
+ add(schemaRegistry.registerSchemaSource(
+ new YangLibrarySchemaYangSourceProvider(remoteDeviceId, libraryModulesSchemas.getAvailableModels()),
+ PotentialSchemaSource
+ .create(sourceIdentifierURLEntry.getKey(), YangTextSchemaSource.class,
+ PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+ }
+ }
+ }
+
final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
+ final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> device;
+ if (node.isSchemaless()) {
+ device = new SchemalessNetconfDevice(remoteDeviceId, salFacade);
+ } else {
+ device = new NetconfDeviceBuilder()
+ .setReconnectOnSchemasChange(reconnectOnChangedSchema)
+ .setSchemaResourcesDTO(schemaResourcesDTO)
+ .setGlobalProcessingExecutor(processingExecutor.getExecutor())
+ .setId(remoteDeviceId)
+ .setSalFacade(salFacade)
+ .build();
+ }
- final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
- processingExecutor.getExecutor(), reconnectOnChangedSchema);
+ final Optional<UserPreferences> userCapabilities = getUserCapabilities(node);
+ final int rpcMessageLimit =
+ node.getConcurrentRpcLimit() == null ? DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
- final Optional<NetconfSessionPreferences> userCapabilities = getUserCapabilities(node);
+ if (rpcMessageLimit < 1) {
+ LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
+ }
- return new NetconfConnectorDTO(
- userCapabilities.isPresent() ?
- new NetconfDeviceCommunicator(
- remoteDeviceId, device, new UserPreferences(userCapabilities.get(), node.getYangModuleCapabilities().isOverride())):
- new NetconfDeviceCommunicator(remoteDeviceId, device)
- , salFacade);
+ return new NetconfConnectorDTO(userCapabilities.isPresent()
+ ? new NetconfDeviceCommunicator(remoteDeviceId, device, userCapabilities.get(), rpcMessageLimit)
+ : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
}
protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
}
if (schemaResourcesDTO == null) {
- schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory,
- new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+ schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaRepository, schemaContextFactory,
+ new NetconfStateSchemasResolverImpl());
}
return schemaResourcesDTO;
final FilesystemSchemaSourceCache<YangTextSchemaSource> deviceCache =
createDeviceFilesystemCache(moduleSchemaCacheDirectory);
repository.registerSchemaSourceListener(deviceCache);
- return new NetconfDevice.SchemaResourcesDTO(repository, schemaContextFactory,
- new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+ return new NetconfDevice.SchemaResourcesDTO(repository, repository, schemaContextFactory,
+ new NetconfStateSchemasResolverImpl());
}
/**
return new FilesystemSchemaSourceCache<>(schemaRegistry, YangTextSchemaSource.class, new File(relativeSchemaCacheDirectory));
}
- public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, NetconfNode node) {
+ public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, final NetconfNode node) {
//setup default values since default value is not supported in mdsal
final long clientConnectionTimeoutMillis = node.getConnectionTimeoutMillis() == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : node.getConnectionTimeoutMillis();
protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker);
- @Override
- public abstract ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
-
- @Override
- public void onSessionInitiated(ProviderSession session) {
- mountPointService = session.getService(DOMMountPointService.class);
- }
-
- @Override
- public Collection<ProviderFunctionality> getProviderFunctionality() {
- return Collections.emptySet();
- }
-
- private InetSocketAddress getSocketAddress(final Host host, int port) {
+ private InetSocketAddress getSocketAddress(final Host host, final int port) {
if(host.getDomainName() != null) {
return new InetSocketAddress(host.getDomainName().getValue(), port);
} else {
}
}
- private Optional<NetconfSessionPreferences> getUserCapabilities(final NetconfNode node) {
- if(node.getYangModuleCapabilities() == null) {
+ private Optional<UserPreferences> getUserCapabilities(final NetconfNode node) {
+ // if none of yang-module-capabilities or non-module-capabilities is specified
+ // just return absent
+ if (node.getYangModuleCapabilities() == null && node.getNonModuleCapabilities() == null) {
return Optional.absent();
}
- final List<String> capabilities = node.getYangModuleCapabilities().getCapability();
- if(capabilities == null || capabilities.isEmpty()) {
- return Optional.absent();
+ final List<String> capabilities = new ArrayList<>();
+
+ boolean overrideYangModuleCaps = false;
+ if (node.getYangModuleCapabilities() != null) {
+ capabilities.addAll(node.getYangModuleCapabilities().getCapability());
+ overrideYangModuleCaps = node.getYangModuleCapabilities().isOverride();
}
- final NetconfSessionPreferences parsedOverrideCapabilities = NetconfSessionPreferences.fromStrings(capabilities);
- Preconditions.checkState(parsedOverrideCapabilities.getNonModuleCaps().isEmpty(), "Capabilities to override can " +
- "only contain module based capabilities, non-module capabilities will be retrieved from the device," +
- " configured non-module capabilities: " + parsedOverrideCapabilities.getNonModuleCaps());
+ //non-module capabilities should not exist in yang module capabilities
+ final NetconfSessionPreferences netconfSessionPreferences = NetconfSessionPreferences.fromStrings(capabilities);
+ Preconditions.checkState(netconfSessionPreferences.getNonModuleCaps().isEmpty(), "List yang-module-capabilities/capability " +
+ "should contain only module based capabilities. Non-module capabilities used: " +
+ netconfSessionPreferences.getNonModuleCaps());
+
+ boolean overrideNonModuleCaps = false;
+ if (node.getNonModuleCapabilities() != null) {
+ capabilities.addAll(node.getNonModuleCapabilities().getCapability());
+ overrideNonModuleCaps = node.getNonModuleCapabilities().isOverride();
+ }
- return Optional.of(parsedOverrideCapabilities);
+ return Optional.of(new UserPreferences(NetconfSessionPreferences.fromStrings(capabilities, CapabilityOrigin.UserDefined),
+ overrideYangModuleCaps, overrideNonModuleCaps));
}
private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
}
}
- protected static class NetconfConnectorDTO {
+ protected static class NetconfConnectorDTO implements AutoCloseable {
private final NetconfDeviceCommunicator communicator;
private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
public NetconfClientSessionListener getSessionListener() {
return communicator;
}
- }
+ @Override
+ public void close() {
+ communicator.close();
+ facade.close();
+ }
+ }
}