X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.java;h=cc9eb5a851271c8ed221d94038b9db5e35f92059;hp=350132cf99a5dfb05681b0732912e7a4220335d2;hb=971b179000ef1cc56699de35061cf6f97d4cf36f;hpb=5957a742796f6a1426e406a54002d9decc7a7188 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index 350132cf99..cc9eb5a851 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -7,41 +7,47 @@ */ package org.opendaylight.controller.sal.connect.netconf; -import java.io.InputStream; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; - import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.controller.sal.connect.api.MessageTransformer; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; -import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory; -import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory; import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; -import org.opendaylight.controller.sal.connect.netconf.schema.NetconfDeviceSchemaProviderFactory; -import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaSourceProvider; -import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer; +import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; -import org.opendaylight.controller.sal.core.api.RpcImplementation; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; -import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider; -import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation; +import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; +import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - /** * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade */ @@ -49,51 +55,33 @@ public final class NetconfDevice implements RemoteDevice QNAME_TO_SOURCE_ID_FUNCTION = new Function() { + @Override + public SourceIdentifier apply(final QName input) { + return new SourceIdentifier(input.getLocalName(), Optional.fromNullable(input.getFormattedRevision())); + } + }; + private final RemoteDeviceId id; + private final SchemaContextFactory schemaContextFactory; private final RemoteDeviceHandler salFacade; private final ListeningExecutorService processingExecutor; + private final SchemaSourceRegistry schemaRegistry; private final MessageTransformer messageTransformer; - private final SchemaContextProviderFactory schemaContextProviderFactory; - private final SchemaSourceProviderFactory sourceProviderFactory; private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; private final NotificationHandler notificationHandler; + private final List> sourceRegistrations = Lists.newArrayList(); - public static NetconfDevice createNetconfDevice(final RemoteDeviceId id, - final AbstractCachingSchemaSourceProvider schemaSourceProvider, - final ExecutorService executor, final RemoteDeviceHandler salFacade) { - return createNetconfDevice(id, schemaSourceProvider, executor, salFacade, new NetconfStateSchemas.NetconfStateSchemasResolverImpl()); - } - - @VisibleForTesting - protected static NetconfDevice createNetconfDevice(final RemoteDeviceId id, - final AbstractCachingSchemaSourceProvider schemaSourceProvider, - final ExecutorService executor, final RemoteDeviceHandler salFacade, - final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { - - return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(), - new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory() { - @Override - public SchemaSourceProvider createSourceProvider(final RpcImplementation deviceRpc) { - return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id, - deviceRpc)); - } - }, stateSchemasResolver); - } - - @VisibleForTesting - protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ExecutorService processingExecutor, final MessageTransformer messageTransformer, - final SchemaContextProviderFactory schemaContextProviderFactory, - final SchemaSourceProviderFactory sourceProviderFactory, - final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { + public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ExecutorService globalProcessingExecutor, final MessageTransformer messageTransformer) { this.id = id; + this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry(); this.messageTransformer = messageTransformer; + this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory(); this.salFacade = salFacade; - this.sourceProviderFactory = sourceProviderFactory; - this.stateSchemasResolver = stateSchemasResolver; - this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor); - this.schemaContextProviderFactory = schemaContextProviderFactory; + this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver(); + this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor); this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id); } @@ -107,60 +95,73 @@ public final class NetconfDevice implements RemoteDevice salInitializationFuture = processingExecutor.submit(new Runnable() { + final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(listener); + + final DeviceSourcesResolver task = new DeviceSourcesResolver(deviceRpc, remoteSessionCapabilities, id, stateSchemasResolver); + final ListenableFuture sourceResolverFuture = processingExecutor.submit(task); + + final FutureCallback resolvedSourceCallback = new FutureCallback() { @Override - public void run() { - final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener); - - final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id); - logger.warn("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames()); - // TODO use this for shared schema context - - final SchemaSourceProvider delegate = sourceProviderFactory.createSourceProvider(deviceRpc); - final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities); - updateMessageTransformer(schemaContextProvider); - salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc); - notificationHandler.onRemoteSchemaUp(); + public void onSuccess(final DeviceSources result) { + addProvidedSourcesToSchemaRegistry(deviceRpc, result); + setUpSchema(result); } - }); - Futures.addCallback(salInitializationFuture, new FutureCallback() { - @Override - public void onSuccess(final Object result) { - logger.debug("{}: Initialization in sal successful", id); - logger.info("{}: Netconf connector initialized successfully", id); + private void setUpSchema(final DeviceSources result) { + processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, deviceRpc, listener)); } @Override public void onFailure(final Throwable t) { - // Unable to initialize device, set as disconnected - logger.error("{}: Initialization failed", id, t); - salFacade.onDeviceDisconnected(); - // TODO ssh connection is still open if sal initialization fails + logger.warn("{}: Unexpected error resolving device sources: {}", id, t); + handleSalInitializationFailure(t, listener); } - }); + }; + + Futures.addCallback(sourceResolverFuture, resolvedSourceCallback); + } + + private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionCapabilities remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) { + updateMessageTransformer(result); + salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc); + notificationHandler.onRemoteSchemaUp(); + + logger.debug("{}: Initialization in sal successful", id); + logger.info("{}: Netconf connector initialized successfully", id); + } + + private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator listener) { + logger.error("{}: Initialization in sal failed, disconnecting from device", id, t); + listener.close(); + onRemoteSessionDown(); } /** * Update initial message transformer to use retrieved schema + * @param currentSchemaContext */ - private void updateMessageTransformer(final SchemaContextProvider schemaContextProvider) { - messageTransformer.onGlobalContextUpdated(schemaContextProvider.getSchemaContext()); + private void updateMessageTransformer(final SchemaContext currentSchemaContext) { + messageTransformer.onGlobalContextUpdated(currentSchemaContext); } - private SchemaContextProvider setUpSchemaContext(final SchemaSourceProvider sourceProvider, final NetconfSessionCapabilities capabilities) { - return schemaContextProviderFactory.createContextProvider(capabilities.getModuleBasedCaps(), sourceProvider); + private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) { + final NetconfRemoteSchemaYangSourceProvider yangProvider = new NetconfRemoteSchemaYangSourceProvider(id, deviceRpc); + for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) { + sourceRegistrations.add(schemaRegistry.registerSchemaSource(yangProvider, + PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))); + } } - private NetconfDeviceRpc setUpDeviceRpc(final NetconfSessionCapabilities capHolder, final RemoteDeviceCommunicator listener) { - Preconditions.checkArgument(capHolder.isMonitoringSupported(), - "%s: Netconf device does not support netconf monitoring, yang schemas cannot be acquired. Netconf device capabilities", capHolder); - return new NetconfDeviceRpc(listener, messageTransformer); + private NetconfDeviceRpc setUpDeviceRpc(final RemoteDeviceCommunicator listener) { + return new NetconfDeviceRpc(listener, messageTransformer); } @Override public void onRemoteSessionDown() { salFacade.onDeviceDisconnected(); + for (final SchemaSourceRegistration sourceRegistration : sourceRegistrations) { + sourceRegistration.close(); + } } @Override @@ -169,59 +170,181 @@ public final class NetconfDevice implements RemoteDevice salFacade; - private final List cache = new LinkedList<>(); - private final MessageTransformer messageTransformer; - private boolean passNotifications = false; + public NetconfStateSchemas.NetconfStateSchemasResolver getStateSchemasResolver() { + return stateSchemasResolver; + } + } + + /** + * Schema building callable. + */ + private static class DeviceSourcesResolver implements Callable { + private final NetconfDeviceRpc deviceRpc; + private final NetconfSessionCapabilities remoteSessionCapabilities; private final RemoteDeviceId id; + private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; - NotificationHandler(final RemoteDeviceHandler salFacade, final MessageTransformer messageTransformer, final RemoteDeviceId id) { - this.salFacade = salFacade; - this.messageTransformer = messageTransformer; + public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { + this.deviceRpc = deviceRpc; + this.remoteSessionCapabilities = remoteSessionCapabilities; this.id = id; + this.stateSchemasResolver = stateSchemasResolver; } - synchronized void handleNotification(final NetconfMessage notification) { - if(passNotifications) { - passNotification(messageTransformer.toNotification(notification)); - } else { - cacheNotification(notification); + @Override + public DeviceSources call() throws Exception { + + final Set requiredSources = Sets.newHashSet(Collections2.transform( + remoteSessionCapabilities.getModuleBasedCaps(), QNAME_TO_SOURCE_ID_FUNCTION)); + + // If monitoring is not supported, we will still attempt to create schema, sources might be already provided + final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id); + logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames()); + + final Set providedSources = Sets.newHashSet(Collections2.transform( + availableSchemas.getAvailableYangSchemasQNames(), QNAME_TO_SOURCE_ID_FUNCTION)); + + final Set requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources); + + if (!requiredSourcesNotProvided.isEmpty()) { + logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}", + id, requiredSourcesNotProvided); + logger.warn("{}: Attempting to build schema context from required sources", id); } - } - /** - * Forward all cached notifications and pass all notifications from this point directly to sal facade. - */ - synchronized void onRemoteSchemaUp() { - passNotifications = true; - for (final NetconfMessage cachedNotification : cache) { - passNotification(messageTransformer.toNotification(cachedNotification)); + // TODO should we perform this ? We have a mechanism to fix initialization of devices not reporting or required modules in hello + // That is overriding capabilities in configuration using attribute yang-module-capabilities + // This is more user friendly even though it clashes with attribute yang-module-capabilities + // Some devices do not report all required models in hello message, but provide them + final Set providedSourcesNotRequired = Sets.difference(providedSources, requiredSources); + if (!providedSourcesNotRequired.isEmpty()) { + logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}", + id, providedSourcesNotRequired); + logger.warn("{}: Adding provided but not required sources as required to prevent failures", id); + requiredSources.addAll(providedSourcesNotRequired); } - cache.clear(); + return new DeviceSources(requiredSources, providedSources); } + } - private void cacheNotification(final NetconfMessage notification) { - Preconditions.checkState(passNotifications == false); + /** + * Contains RequiredSources - sources from capabilities. + * + */ + private static final class DeviceSources { + private final Collection requiredSources; + private final Collection providedSources; - logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification); - if(logger.isTraceEnabled()) { - logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument())); - } + public DeviceSources(final Collection requiredSources, final Collection providedSources) { + this.requiredSources = requiredSources; + this.providedSources = providedSources; + } - cache.add(notification); + public Collection getRequiredSources() { + return requiredSources; } - private void passNotification(final CompositeNode parsedNotification) { - logger.debug("{}: Forwarding notification {}", id, parsedNotification); - Preconditions.checkNotNull(parsedNotification); - salFacade.onNotification(parsedNotification); + public Collection getProvidedSources() { + return providedSources; } + } + /** + * Schema builder that tries to build schema context from provided sources or biggest subset of it. + */ + private final class RecursiveSchemaSetup implements Runnable { + private final DeviceSources deviceSources; + private final NetconfSessionCapabilities remoteSessionCapabilities; + private final NetconfDeviceRpc deviceRpc; + private final RemoteDeviceCommunicator listener; + + public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionCapabilities remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator listener) { + this.deviceSources = deviceSources; + this.remoteSessionCapabilities = remoteSessionCapabilities; + this.deviceRpc = deviceRpc; + this.listener = listener; + } + + @Override + public void run() { + setUpSchema(deviceSources.getRequiredSources()); + } + + /** + * Recursively build schema context, in case of success or final failure notify device + */ + private void setUpSchema(final Collection requiredSources) { + logger.trace("{}: Trying to build schema context from {}", id, requiredSources); + + // If no more sources, fail + if(requiredSources.isEmpty()) { + handleSalInitializationFailure(new IllegalStateException(id + ": No more sources for schema context"), listener); + return; + } + + final CheckedFuture schemaBuilderFuture = schemaContextFactory.createSchemaContext(requiredSources); + + final FutureCallback RecursiveSchemaBuilderCallback = new FutureCallback() { + + @Override + public void onSuccess(final SchemaContext result) { + logger.debug("{}: Schema context built successfully from {}", id, requiredSources); + handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc); + } + + @Override + public void onFailure(final Throwable t) { + // In case source missing, try without it + if (t instanceof MissingSchemaSourceException) { + final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId(); + logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource); + setUpSchema(stripMissingSource(requiredSources, missingSource)); + + // In case resolution error, try only with resolved sources + } else if (t instanceof SchemaResolutionException) { + // TODO check for infinite loop + final SchemaResolutionException resolutionException = (SchemaResolutionException) t; + logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports()); + setUpSchema(resolutionException.getResolvedSources()); + // unknown error, fail + } else { + handleSalInitializationFailure(t, listener); + } + } + }; + + Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback); + } + + private Collection stripMissingSource(final Collection requiredSources, final SourceIdentifier sIdToRemove) { + final LinkedList sourceIdentifiers = Lists.newLinkedList(requiredSources); + final boolean removed = sourceIdentifiers.remove(sIdToRemove); + Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources); + return sourceIdentifiers; + } + } }