import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.util.concurrent.EventExecutor;
-import java.io.File;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.aaa.encrypt.AAAEncryptionService;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
import org.opendaylight.netconf.sal.connect.netconf.LibraryModulesSchemas;
import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO;
import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceBuilder;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemasResolverImpl;
import org.opendaylight.netconf.sal.connect.netconf.SchemalessNetconfDevice;
import org.opendaylight.netconf.sal.connect.netconf.auth.DatastoreBackedPublicKeyAuth;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.sal.connect.util.SslHandlerFactoryImpl;
import org.opendaylight.netconf.topology.api.NetconfTopology;
-import org.opendaylight.netconf.topology.api.SchemaRepositoryProvider;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev190614.NetconfNodeAugmentedOptional;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.parameters.Protocol;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.key.auth.KeyBased;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.login.pw.LoginPassword;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.login.pw.unencrypted.LoginPasswordUnencrypted;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.schema.storage.YangLibrary;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactoryConfiguration;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
-import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
-import org.opendaylight.yangtools.yang.model.repo.util.InMemorySchemaSourceCache;
-import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
-import org.opendaylight.yangtools.yang.parser.rfc7950.repo.ASTSchemaSource;
-import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractNetconfTopology implements NetconfTopology {
-
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
protected static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L;
private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
- // constants related to Schema Cache(s)
- /**
- * Filesystem based caches are stored relative to the cache directory.
- */
- private static final String CACHE_DIRECTORY = "cache";
-
- /**
- * The default cache directory relative to <code>CACHE_DIRECTORY</code>.
- */
- private static final String DEFAULT_CACHE_DIRECTORY = "schema";
-
- /**
- * The qualified schema cache directory <code>cache/schema</code>.
- */
- private static final String QUALIFIED_DEFAULT_CACHE_DIRECTORY =
- CACHE_DIRECTORY + File.separator + DEFAULT_CACHE_DIRECTORY;
-
- /**
- * The name for the default schema repository.
- */
- private static final String DEFAULT_SCHEMA_REPOSITORY_NAME = "sal-netconf-connector";
-
- /**
- * The default schema repository in the case that one is not specified.
- */
- private static final SharedSchemaRepository DEFAULT_SCHEMA_REPOSITORY =
- new SharedSchemaRepository(DEFAULT_SCHEMA_REPOSITORY_NAME);
-
- public static final InMemorySchemaSourceCache<ASTSchemaSource> DEFAULT_AST_CACHE =
- InMemorySchemaSourceCache.createSoftCache(DEFAULT_SCHEMA_REPOSITORY, ASTSchemaSource.class);
-
- /**
- * The default factory for creating <code>SchemaContext</code> instances.
- */
- private static final EffectiveModelContextFactory DEFAULT_SCHEMA_CONTEXT_FACTORY =
- DEFAULT_SCHEMA_REPOSITORY.createEffectiveModelContextFactory(
- SchemaContextFactoryConfiguration.getDefault());
-
- /**
- * Keeps track of initialized Schema resources. A Map is maintained in which the key represents the name
- * of the schema cache directory, and the value is a corresponding <code>SchemaResourcesDTO</code>. The
- * <code>SchemaResourcesDTO</code> is essentially a container that allows for the extraction of the
- * <code>SchemaRegistry</code> and <code>SchemaContextFactory</code> which should be used for a particular
- * Netconf mount. Access to <code>SCHEMA_RESOURCES_DTO_MAP</code> should be surrounded by appropriate
- * synchronization locks.
- */
- private static final Map<String, NetconfDevice.SchemaResourcesDTO> SCHEMA_RESOURCES_DTO_MAP = new HashMap<>();
-
- // Initializes default constant instances for the case when the default schema repository
- // directory cache/schema is used.
- static {
- SCHEMA_RESOURCES_DTO_MAP.put(DEFAULT_CACHE_DIRECTORY,
- new NetconfDevice.SchemaResourcesDTO(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY,
- DEFAULT_SCHEMA_CONTEXT_FACTORY,
- new NetconfStateSchemasResolverImpl()));
- DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(DEFAULT_AST_CACHE);
- DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(
- TextToASTTransformer.create(DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY));
-
- /*
- * Create the default <code>FilesystemSchemaSourceCache</code>, which stores cached files
- * in <code>cache/schema</code>. Try up to 3 times - we've seen intermittent failures on jenkins where
- * FilesystemSchemaSourceCache throws an IAE due to mkdirs failure. The theory is that there's a race
- * creating the dir and it already exists when mkdirs is called (mkdirs returns false in this case). In this
- * scenario, a retry should succeed.
- */
- int tries = 1;
- while (true) {
- try {
- FilesystemSchemaSourceCache<YangTextSchemaSource> defaultCache =
- new FilesystemSchemaSourceCache<>(DEFAULT_SCHEMA_REPOSITORY, YangTextSchemaSource.class,
- new File(QUALIFIED_DEFAULT_CACHE_DIRECTORY));
- DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(defaultCache);
- break;
- } catch (IllegalArgumentException e) {
- if (tries++ >= 3) {
- LOG.error("Error creating default schema cache", e);
- break;
- }
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
- }
- }
private final NetconfClientDispatcher clientDispatcher;
private final EventExecutor eventExecutor;
private final DeviceActionFactory deviceActionFactory;
private final NetconfKeystoreAdapter keystoreAdapter;
+ private final SchemaResourceManager schemaManager;
+
protected final ScheduledThreadPool keepaliveExecutor;
protected final ListeningExecutorService processingExecutor;
- protected final SharedSchemaRepository sharedSchemaRepository;
protected final DataBroker dataBroker;
protected final DOMMountPointService mountPointService;
protected final String topologyId;
- protected SchemaSourceRegistry schemaRegistry = DEFAULT_SCHEMA_REPOSITORY;
- protected SchemaRepository schemaRepository = DEFAULT_SCHEMA_REPOSITORY;
- protected SchemaContextFactory schemaContextFactory = DEFAULT_SCHEMA_CONTEXT_FACTORY;
protected String privateKeyPath;
protected String privateKeyPassphrase;
protected final AAAEncryptionService encryptionService;
protected AbstractNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
- final ThreadPool processingExecutor,
- final SchemaRepositoryProvider schemaRepositoryProvider,
+ final ThreadPool processingExecutor, final SchemaResourceManager schemaManager,
final DataBroker dataBroker, final DOMMountPointService mountPointService,
final AAAEncryptionService encryptionService,
final DeviceActionFactory deviceActionFactory) {
this.eventExecutor = eventExecutor;
this.keepaliveExecutor = keepaliveExecutor;
this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
+ this.schemaManager = requireNonNull(schemaManager);
this.deviceActionFactory = deviceActionFactory;
- this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
this.dataBroker = dataBroker;
this.mountPointService = mountPointService;
this.encryptionService = encryptionService;
this.keystoreAdapter = new NetconfKeystoreAdapter(dataBroker);
}
- public void setSchemaRegistry(final SchemaSourceRegistry schemaRegistry) {
- this.schemaRegistry = schemaRegistry;
- }
-
- public void setSchemaContextFactory(final SchemaContextFactory schemaContextFactory) {
- this.schemaContextFactory = schemaContextFactory;
- }
-
@Override
public ListenableFuture<NetconfDeviceCapabilities> connectNode(final NodeId nodeId, final Node configNode) {
LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, hideCredentials(configNode));
? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis().toJava();
final long keepaliveDelay = node.getKeepaliveDelay() == null
? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay().toJava();
- final boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null
- ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
final IpAddress ipAddress = node.getHost().getIpAddress();
final InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null
node.getPort().getValue().toJava());
final RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
- RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
- createSalFacade(remoteDeviceId);
+ RemoteDeviceHandler<NetconfSessionPreferences> salFacade = createSalFacade(remoteDeviceId);
if (keepaliveDelay > 0) {
LOG.warn("Adding keepalive facade, for device {}", nodeId);
keepaliveDelay, defaultRequestTimeoutMillis);
}
- // pre register yang library sources as fallback schemas to schema registry
- final List<SchemaSourceRegistration<YangTextSchemaSource>> registeredYangLibSources = Lists.newArrayList();
- if (node.getYangLibrary() != null) {
- final String yangLibURL = node.getYangLibrary().getYangLibraryUrl().getValue();
- final String yangLibUsername = node.getYangLibrary().getUsername();
- final String yangLigPassword = node.getYangLibrary().getPassword();
-
- final LibraryModulesSchemas libraryModulesSchemas;
- if (yangLibURL != null) {
- if (yangLibUsername != null && yangLigPassword != null) {
- libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword);
- } else {
- libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL);
- }
-
- for (final Map.Entry<SourceIdentifier, URL> sourceIdentifierURLEntry
- : libraryModulesSchemas.getAvailableModels().entrySet()) {
- registeredYangLibSources
- .add(schemaRegistry.registerSchemaSource(
- new YangLibrarySchemaYangSourceProvider(remoteDeviceId,
- libraryModulesSchemas.getAvailableModels()),
- PotentialSchemaSource.create(sourceIdentifierURLEntry.getKey(),
- YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
- }
- }
- }
-
- final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> device;
if (node.isSchemaless()) {
device = new SchemalessNetconfDevice(remoteDeviceId, salFacade);
} else {
- NetconfDeviceBuilder netconfDeviceBuilder = new NetconfDeviceBuilder()
- .setReconnectOnSchemasChange(reconnectOnChangedSchema)
- .setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(this.processingExecutor)
- .setId(remoteDeviceId)
- .setSalFacade(salFacade)
- .setNode(node)
- .setEventExecutor(eventExecutor)
- .setNodeOptional(nodeOptional)
- .setDeviceActionFactory(deviceActionFactory);
- device = netconfDeviceBuilder.build();
+ device = createNetconfDevice(remoteDeviceId, salFacade, nodeId, node, nodeOptional);
}
final Optional<UserPreferences> userCapabilities = getUserCapabilities(node);
return new NetconfConnectorDTO(netconfDeviceCommunicator, salFacade);
}
- protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
- // Setup information related to the SchemaRegistry, SchemaResourceFactory, etc.
- NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = null;
- final String moduleSchemaCacheDirectory = node.getSchemaCacheDirectory();
- // Only checks to ensure the String is not empty or null; further checks related to directory
- // accessibility and file permissionsare handled during the FilesystemSchemaSourceCache initialization.
- if (!Strings.isNullOrEmpty(moduleSchemaCacheDirectory)) {
- // If a custom schema cache directory is specified, create the backing DTO; otherwise,
- // the SchemaRegistry and SchemaContextFactory remain the default values.
- if (!moduleSchemaCacheDirectory.equals(DEFAULT_CACHE_DIRECTORY)) {
- // Multiple modules may be created at once;
- // synchronize to avoid issues with data consistency among threads.
- synchronized (SCHEMA_RESOURCES_DTO_MAP) {
- // Look for the cached DTO to reuse SchemaRegistry and SchemaContextFactory variables
- // if they already exist
- schemaResourcesDTO = SCHEMA_RESOURCES_DTO_MAP.get(moduleSchemaCacheDirectory);
- if (schemaResourcesDTO == null) {
- schemaResourcesDTO = createSchemaResourcesDTO(moduleSchemaCacheDirectory);
- schemaResourcesDTO.getSchemaRegistry().registerSchemaSourceListener(
- TextToASTTransformer.create((SchemaRepository) schemaResourcesDTO.getSchemaRegistry(),
- schemaResourcesDTO.getSchemaRegistry())
- );
- SCHEMA_RESOURCES_DTO_MAP.put(moduleSchemaCacheDirectory, schemaResourcesDTO);
- }
- }
- LOG.info("Netconf connector for device {} will use schema cache directory {} instead of {}",
- nodeId.getValue(), moduleSchemaCacheDirectory, DEFAULT_CACHE_DIRECTORY);
- }
- } else {
- LOG.warn("schema-cache-directory for {} is null or empty; using the default {}",
- nodeId.getValue(), QUALIFIED_DEFAULT_CACHE_DIRECTORY);
- }
+ private NetconfDevice createNetconfDevice(final RemoteDeviceId remoteDeviceId,
+ final RemoteDeviceHandler<NetconfSessionPreferences> salFacade, final NodeId nodeId, final NetconfNode node,
+ final NetconfNodeAugmentedOptional nodeOptional) {
+ final boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null
+ ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
- if (schemaResourcesDTO == null) {
- schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaRepository,
- schemaContextFactory, new NetconfStateSchemasResolverImpl());
- }
+ final SchemaResourcesDTO resources = schemaManager.getSchemaResources(node, nodeId.getValue());
+
+ final NetconfDevice device = new NetconfDeviceBuilder()
+ .setReconnectOnSchemasChange(reconnectOnChangedSchema)
+ .setSchemaResourcesDTO(resources)
+ .setGlobalProcessingExecutor(this.processingExecutor)
+ .setId(remoteDeviceId)
+ .setSalFacade(salFacade)
+ .setNode(node)
+ .setEventExecutor(eventExecutor)
+ .setNodeOptional(nodeOptional)
+ .setDeviceActionFactory(deviceActionFactory)
+ .build();
- return schemaResourcesDTO;
- }
+ final YangLibrary yangLibrary = node.getYangLibrary();
+ if (yangLibrary != null) {
+ final Uri uri = yangLibrary.getYangLibraryUrl();
+ if (uri != null) {
+ // FIXME: NETCONF-675: these registrations need to be torn down with the device. This does not look
+ // quite right, though, as we can end up adding a lot of registrations on a
+ // per-device basis.
+ // This leak is also detected by SpotBugs as soon as this initialization is switched
+ // to proper "new ArrayList<>" and hence we really need to attach these somewhere
+ // else.
+ // It seems we should be subclassing NetconfConnectorDTO for this purpose as a
+ // first step and then perhaps do some refcounting or similar based on the
+ // schemaRegistry instance.
+ final List<SchemaSourceRegistration<?>> registeredYangLibSources = Lists.newArrayList();
+ final String yangLibURL = uri.getValue();
+ final SchemaSourceRegistry schemaRegistry = resources.getSchemaRegistry();
+
+ // pre register yang library sources as fallback schemas to schema registry
+ final LibraryModulesSchemas schemas;
+ final String yangLibUsername = yangLibrary.getUsername();
+ final String yangLigPassword = yangLibrary.getPassword();
+ if (yangLibUsername != null && yangLigPassword != null) {
+ schemas = LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword);
+ } else {
+ schemas = LibraryModulesSchemas.create(yangLibURL);
+ }
- /**
- * Creates the backing Schema classes for a particular directory.
- *
- * @param moduleSchemaCacheDirectory The string directory relative to "cache"
- * @return A DTO containing the Schema classes for the Netconf mount.
- */
- private NetconfDevice.SchemaResourcesDTO createSchemaResourcesDTO(final String moduleSchemaCacheDirectory) {
- final SharedSchemaRepository repository = new SharedSchemaRepository(moduleSchemaCacheDirectory);
- final EffectiveModelContextFactory contextFactory
- = repository.createEffectiveModelContextFactory(SchemaContextFactoryConfiguration.getDefault());
- setSchemaRegistry(repository);
- setSchemaContextFactory(contextFactory);
- final FilesystemSchemaSourceCache<YangTextSchemaSource> deviceCache =
- createDeviceFilesystemCache(moduleSchemaCacheDirectory);
- repository.registerSchemaSourceListener(deviceCache);
- repository.registerSchemaSourceListener(
- InMemorySchemaSourceCache.createSoftCache(repository, ASTSchemaSource.class));
- return new NetconfDevice.SchemaResourcesDTO(repository, repository, contextFactory,
- new NetconfStateSchemasResolverImpl());
- }
+ for (final Map.Entry<SourceIdentifier, URL> entry : schemas.getAvailableModels().entrySet()) {
+ registeredYangLibSources.add(schemaRegistry.registerSchemaSource(
+ new YangLibrarySchemaYangSourceProvider(remoteDeviceId, schemas.getAvailableModels()),
+ PotentialSchemaSource.create(entry.getKey(), YangTextSchemaSource.class,
+ PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+ }
+ }
+ }
- /**
- * Creates a <code>FilesystemSchemaSourceCache</code> for the custom schema cache directory.
- *
- * @param schemaCacheDirectory The custom cache directory relative to "cache"
- * @return A <code>FilesystemSchemaSourceCache</code> for the custom schema cache directory
- */
- private FilesystemSchemaSourceCache<YangTextSchemaSource> createDeviceFilesystemCache(
- final String schemaCacheDirectory) {
- final String relativeSchemaCacheDirectory = CACHE_DIRECTORY + File.separator + schemaCacheDirectory;
- return new FilesystemSchemaSourceCache<>(schemaRegistry, YangTextSchemaSource.class,
- new File(relativeSchemaCacheDirectory));
+ return device;
}
/**