X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.java;h=9a5b239024c5bb0cbca3798de58ccc102a994224;hp=350132cf99a5dfb05681b0732912e7a4220335d2;hb=83dfe301bf2a2b1eff6883a2af3282c95d5a752e;hpb=f4b583dd481d8db60c894690a6c9189922f360a9 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index 350132cf99..9a5b239024 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -7,99 +7,103 @@ */ package org.opendaylight.controller.sal.connect.netconf; -import java.io.InputStream; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collection; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; - import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.controller.sal.connect.api.MessageTransformer; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; -import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory; -import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; -import org.opendaylight.controller.sal.connect.netconf.schema.NetconfDeviceSchemaProviderFactory; -import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaSourceProvider; -import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer; +import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider; +import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; -import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; -import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider; -import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation; +import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; +import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - /** * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade */ -public final class NetconfDevice implements RemoteDevice { +public final class NetconfDevice implements RemoteDevice { private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class); + public static final Function QNAME_TO_SOURCE_ID_FUNCTION = new Function() { + @Override + public SourceIdentifier apply(final QName input) { + return new SourceIdentifier(input.getLocalName(), Optional.fromNullable(input.getFormattedRevision())); + } + }; + private final RemoteDeviceId id; + private final boolean reconnectOnSchemasChange; - private final RemoteDeviceHandler salFacade; + private final SchemaContextFactory schemaContextFactory; + private final RemoteDeviceHandler salFacade; private final ListeningExecutorService processingExecutor; + private final SchemaSourceRegistry schemaRegistry; private final MessageTransformer messageTransformer; - private final SchemaContextProviderFactory schemaContextProviderFactory; - private final SchemaSourceProviderFactory sourceProviderFactory; private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; private final NotificationHandler notificationHandler; + private final List> sourceRegistrations = Lists.newArrayList(); - public static NetconfDevice createNetconfDevice(final RemoteDeviceId id, - final AbstractCachingSchemaSourceProvider schemaSourceProvider, - final ExecutorService executor, final RemoteDeviceHandler salFacade) { - return createNetconfDevice(id, schemaSourceProvider, executor, salFacade, new NetconfStateSchemas.NetconfStateSchemasResolverImpl()); - } - - @VisibleForTesting - protected static NetconfDevice createNetconfDevice(final RemoteDeviceId id, - final AbstractCachingSchemaSourceProvider schemaSourceProvider, - final ExecutorService executor, final RemoteDeviceHandler salFacade, - final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { - - return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(), - new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory() { - @Override - public SchemaSourceProvider createSourceProvider(final RpcImplementation deviceRpc) { - return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id, - deviceRpc)); - } - }, stateSchemasResolver); + public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ExecutorService globalProcessingExecutor, final MessageTransformer messageTransformer) { + this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, messageTransformer, false); } - @VisibleForTesting - protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ExecutorService processingExecutor, final MessageTransformer messageTransformer, - final SchemaContextProviderFactory schemaContextProviderFactory, - final SchemaSourceProviderFactory sourceProviderFactory, - final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { + // FIXME reduce parameters + public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ExecutorService globalProcessingExecutor, final MessageTransformer messageTransformer, final boolean reconnectOnSchemasChange) { this.id = id; + this.reconnectOnSchemasChange = reconnectOnSchemasChange; + this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry(); this.messageTransformer = messageTransformer; + this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory(); this.salFacade = salFacade; - this.sourceProviderFactory = sourceProviderFactory; - this.stateSchemasResolver = stateSchemasResolver; - this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor); - this.schemaContextProviderFactory = schemaContextProviderFactory; + this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver(); + this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor); this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id); } @Override - public void onRemoteSessionUp(final NetconfSessionCapabilities remoteSessionCapabilities, - final RemoteDeviceCommunicator listener) { + public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, + final NetconfDeviceCommunicator listener) { // SchemaContext setup has to be performed in a dedicated thread since // we are in a netty thread in this method // Yang models are being downloaded in this method and it would cause a @@ -107,60 +111,128 @@ public final class NetconfDevice implements RemoteDevice salInitializationFuture = processingExecutor.submit(new Runnable() { + final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(listener); + + final DeviceSourcesResolver task = new DeviceSourcesResolver(deviceRpc, remoteSessionCapabilities, id, stateSchemasResolver); + final ListenableFuture sourceResolverFuture = processingExecutor.submit(task); + + if(shouldListenOnSchemaChange(remoteSessionCapabilities)) { + registerToBaseNetconfStream(deviceRpc, listener); + } + + final FutureCallback resolvedSourceCallback = new FutureCallback() { @Override - public void run() { - final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener); - - final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id); - logger.warn("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames()); - // TODO use this for shared schema context - - final SchemaSourceProvider delegate = sourceProviderFactory.createSourceProvider(deviceRpc); - final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities); - updateMessageTransformer(schemaContextProvider); - salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc); - notificationHandler.onRemoteSchemaUp(); + public void onSuccess(final DeviceSources result) { + addProvidedSourcesToSchemaRegistry(deviceRpc, result); + setUpSchema(result); + } + + private void setUpSchema(final DeviceSources result) { + processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, deviceRpc, listener)); } - }); - Futures.addCallback(salInitializationFuture, new FutureCallback() { @Override - public void onSuccess(final Object result) { - logger.debug("{}: Initialization in sal successful", id); - logger.info("{}: Netconf connector initialized successfully", id); + public void onFailure(final Throwable t) { + logger.warn("{}: Unexpected error resolving device sources: {}", id, t); + handleSalInitializationFailure(t, listener); + } + }; + + Futures.addCallback(sourceResolverFuture, resolvedSourceCallback); + + } + + private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) { + final ListenableFuture> rpcResultListenableFuture = + deviceRpc.invokeRpc(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME, NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT); + + final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() { + @Override + public Optional filterNotification(final CompositeNode notification) { + if (isCapabilityChanged(notification)) { + logger.info("{}: Schemas change detected, reconnecting", id); + // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting + listener.disconnect(); + return Optional.absent(); + } + return Optional.of(notification); + } + + private boolean isCapabilityChanged(final CompositeNode notification) { + return notification.getNodeType().equals(NetconfCapabilityChange.QNAME); + } + }; + + Futures.addCallback(rpcResultListenableFuture, new FutureCallback>() { + @Override + public void onSuccess(final RpcResult result) { + notificationHandler.addNotificationFilter(filter); } @Override public void onFailure(final Throwable t) { - // Unable to initialize device, set as disconnected - logger.error("{}: Initialization failed", id, t); - salFacade.onDeviceDisconnected(); - // TODO ssh connection is still open if sal initialization fails + logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t); } }); } + private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) { + return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange; + } + + private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) { + updateMessageTransformer(result); + salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc); + notificationHandler.onRemoteSchemaUp(); + + logger.info("{}: Netconf connector initialized successfully", id); + } + + private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator listener) { + logger.error("{}: Initialization in sal failed, disconnecting from device", id, t); + listener.close(); + onRemoteSessionDown(); + resetMessageTransformer(); + } + + /** + * Set the schema context inside transformer to null as is in initial state + */ + private void resetMessageTransformer() { + updateMessageTransformer(null); + } + /** * Update initial message transformer to use retrieved schema */ - private void updateMessageTransformer(final SchemaContextProvider schemaContextProvider) { - messageTransformer.onGlobalContextUpdated(schemaContextProvider.getSchemaContext()); + private void updateMessageTransformer(final SchemaContext currentSchemaContext) { + messageTransformer.onGlobalContextUpdated(currentSchemaContext); } - private SchemaContextProvider setUpSchemaContext(final SchemaSourceProvider sourceProvider, final NetconfSessionCapabilities capabilities) { - return schemaContextProviderFactory.createContextProvider(capabilities.getModuleBasedCaps(), sourceProvider); + private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) { + final NetconfRemoteSchemaYangSourceProvider yangProvider = new NetconfRemoteSchemaYangSourceProvider(id, deviceRpc); + for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) { + sourceRegistrations.add(schemaRegistry.registerSchemaSource(yangProvider, + PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))); + } } - private NetconfDeviceRpc setUpDeviceRpc(final NetconfSessionCapabilities capHolder, final RemoteDeviceCommunicator listener) { - Preconditions.checkArgument(capHolder.isMonitoringSupported(), - "%s: Netconf device does not support netconf monitoring, yang schemas cannot be acquired. Netconf device capabilities", capHolder); - return new NetconfDeviceRpc(listener, messageTransformer); + private NetconfDeviceRpc setUpDeviceRpc(final RemoteDeviceCommunicator listener) { + return new NetconfDeviceRpc(listener, messageTransformer); } @Override public void onRemoteSessionDown() { salFacade.onDeviceDisconnected(); + for (final SchemaSourceRegistration sourceRegistration : sourceRegistrations) { + sourceRegistration.close(); + } + resetMessageTransformer(); + } + + @Override + public void onRemoteSessionFailed(Throwable throwable) { + salFacade.onDeviceFailed(throwable); } @Override @@ -169,59 +241,214 @@ public final class NetconfDevice implements RemoteDevice salFacade; - private final List cache = new LinkedList<>(); - private final MessageTransformer messageTransformer; - private boolean passNotifications = false; + /** + * Schema building callable. + */ + private static class DeviceSourcesResolver implements Callable { + private final NetconfDeviceRpc deviceRpc; + private final NetconfSessionPreferences remoteSessionCapabilities; private final RemoteDeviceId id; + private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; - NotificationHandler(final RemoteDeviceHandler salFacade, final MessageTransformer messageTransformer, final RemoteDeviceId id) { - this.salFacade = salFacade; - this.messageTransformer = messageTransformer; + public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { + this.deviceRpc = deviceRpc; + this.remoteSessionCapabilities = remoteSessionCapabilities; this.id = id; + this.stateSchemasResolver = stateSchemasResolver; } - synchronized void handleNotification(final NetconfMessage notification) { - if(passNotifications) { - passNotification(messageTransformer.toNotification(notification)); - } else { - cacheNotification(notification); + @Override + public DeviceSources call() throws Exception { + + final Set requiredSources = Sets.newHashSet(Collections2.transform( + remoteSessionCapabilities.getModuleBasedCaps(), QNAME_TO_SOURCE_ID_FUNCTION)); + + // If monitoring is not supported, we will still attempt to create schema, sources might be already provided + final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id); + logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames()); + + final Set providedSources = Sets.newHashSet(Collections2.transform( + availableSchemas.getAvailableYangSchemasQNames(), QNAME_TO_SOURCE_ID_FUNCTION)); + + final Set requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources); + + if (!requiredSourcesNotProvided.isEmpty()) { + logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}", + id, requiredSourcesNotProvided); + logger.warn("{}: Attempting to build schema context from required sources", id); + } + + + // TODO should we perform this ? We have a mechanism to fix initialization of devices not reporting or required modules in hello + // That is overriding capabilities in configuration using attribute yang-module-capabilities + // This is more user friendly even though it clashes with attribute yang-module-capabilities + // Some devices do not report all required models in hello message, but provide them + final Set providedSourcesNotRequired = Sets.difference(providedSources, requiredSources); + if (!providedSourcesNotRequired.isEmpty()) { + logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}", + id, providedSourcesNotRequired); + logger.warn("{}: Adding provided but not required sources as required to prevent failures", id); + requiredSources.addAll(providedSourcesNotRequired); } + + return new DeviceSources(requiredSources, providedSources); + } + } + + /** + * Contains RequiredSources - sources from capabilities. + * + */ + private static final class DeviceSources { + private final Collection requiredSources; + private final Collection providedSources; + + public DeviceSources(final Collection requiredSources, final Collection providedSources) { + this.requiredSources = requiredSources; + this.providedSources = providedSources; + } + + public Collection getRequiredSources() { + return requiredSources; + } + + public Collection getProvidedSources() { + return providedSources; + } + + } + + /** + * Schema builder that tries to build schema context from provided sources or biggest subset of it. + */ + private final class RecursiveSchemaSetup implements Runnable { + private final DeviceSources deviceSources; + private final NetconfSessionPreferences remoteSessionCapabilities; + private final NetconfDeviceRpc deviceRpc; + private final RemoteDeviceCommunicator listener; + private NetconfDeviceCapabilities capabilities; + + public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator listener) { + this.deviceSources = deviceSources; + this.remoteSessionCapabilities = remoteSessionCapabilities; + this.deviceRpc = deviceRpc; + this.listener = listener; + this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities(); + } + + @Override + public void run() { + setUpSchema(deviceSources.getRequiredSources()); } /** - * Forward all cached notifications and pass all notifications from this point directly to sal facade. + * Recursively build schema context, in case of success or final failure notify device */ - synchronized void onRemoteSchemaUp() { - passNotifications = true; - - for (final NetconfMessage cachedNotification : cache) { - passNotification(messageTransformer.toNotification(cachedNotification)); + // FIXME reimplement without recursion + private void setUpSchema(final Collection requiredSources) { + logger.trace("{}: Trying to build schema context from {}", id, requiredSources); + + // If no more sources, fail + if(requiredSources.isEmpty()) { + handleSalInitializationFailure(new IllegalStateException(id + ": No more sources for schema context"), listener); + return; } - cache.clear(); + final CheckedFuture schemaBuilderFuture = schemaContextFactory.createSchemaContext(requiredSources); + + final FutureCallback RecursiveSchemaBuilderCallback = new FutureCallback() { + + @Override + public void onSuccess(final SchemaContext result) { + logger.debug("{}: Schema context built successfully from {}", id, requiredSources); + Collection filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet()); + capabilities.addCapabilities(filteredQNames); + capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps()); + handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc); + } + + @Override + public void onFailure(final Throwable t) { + // In case source missing, try without it + if (t instanceof MissingSchemaSourceException) { + final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId(); + logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource); + capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), UnavailableCapability.FailureReason.MissingSource); + setUpSchema(stripMissingSource(requiredSources, missingSource)); + + // In case resolution error, try only with resolved sources + } else if (t instanceof SchemaResolutionException) { + // TODO check for infinite loop + final SchemaResolutionException resolutionException = (SchemaResolutionException) t; + final Set unresolvedSources = resolutionException.getUnsatisfiedImports().keySet(); + capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve); + logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports()); + setUpSchema(resolutionException.getResolvedSources()); + // unknown error, fail + } else { + handleSalInitializationFailure(t, listener); + } + } + }; + + Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback); } - private void cacheNotification(final NetconfMessage notification) { - Preconditions.checkState(passNotifications == false); + private Collection stripMissingSource(final Collection requiredSources, final SourceIdentifier sIdToRemove) { + final LinkedList sourceIdentifiers = Lists.newLinkedList(requiredSources); + final boolean removed = sourceIdentifiers.remove(sIdToRemove); + Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources); + return sourceIdentifiers; + } - logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification); - if(logger.isTraceEnabled()) { - logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument())); + private Collection getQNameFromSourceIdentifiers(Collection identifiers) { + Collection qNames = new HashSet<>(); + for (SourceIdentifier source : identifiers) { + Optional qname = getQNameFromSourceIdentifier(source); + if (qname.isPresent()) { + qNames.add(qname.get()); + } } - - cache.add(notification); + if (qNames.isEmpty()) { + logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers); + } + return qNames; } - private void passNotification(final CompositeNode parsedNotification) { - logger.debug("{}: Forwarding notification {}", id, parsedNotification); - Preconditions.checkNotNull(parsedNotification); - salFacade.onNotification(parsedNotification); + private Optional getQNameFromSourceIdentifier(SourceIdentifier identifier) { + for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) { + if (qname.getLocalName().equals(identifier.getName()) + && qname.getFormattedRevision().equals(identifier.getRevision())) { + return Optional.of(qname); + } + } + throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier); } } - }