*/
package org.opendaylight.controller.sal.connect.netconf;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import java.io.InputStream;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.controller.sal.connect.api.MessageTransformer;
import org.opendaylight.controller.sal.connect.api.RemoteDevice;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
-import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory;
-import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
-import org.opendaylight.controller.sal.connect.netconf.schema.NetconfDeviceSchemaProviderFactory;
-import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaSourceProvider;
-import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
+import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
-import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
*/
-public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilities, NetconfMessage> {
+public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
+ public static final Function<QName, SourceIdentifier> QNAME_TO_SOURCE_ID_FUNCTION = new Function<QName, SourceIdentifier>() {
+ @Override
+ public SourceIdentifier apply(final QName input) {
+ return new SourceIdentifier(input.getLocalName(), Optional.fromNullable(input.getFormattedRevision()));
+ }
+ };
+
private final RemoteDeviceId id;
+ private final boolean reconnectOnSchemasChange;
- private final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade;
+ private final SchemaContextFactory schemaContextFactory;
+ private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
private final ListeningExecutorService processingExecutor;
+ private final SchemaSourceRegistry schemaRegistry;
private final MessageTransformer<NetconfMessage> messageTransformer;
- private final SchemaContextProviderFactory schemaContextProviderFactory;
- private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
+ private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
private final NotificationHandler notificationHandler;
+ private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
- public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
- final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
- final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
-
- return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(),
- new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory<InputStream>() {
- @Override
- public SchemaSourceProvider<InputStream> createSourceProvider(final RpcImplementation deviceRpc) {
- return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id,
- deviceRpc));
- }
- });
+ public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+ final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer) {
+ this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, messageTransformer, false);
}
- @VisibleForTesting
- protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
- final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
- final SchemaContextProviderFactory schemaContextProviderFactory,
- final SchemaSourceProviderFactory<InputStream> sourceProviderFactory) {
+ // FIXME reduce parameters
+ public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
+ final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer, final boolean reconnectOnSchemasChange) {
this.id = id;
+ this.reconnectOnSchemasChange = reconnectOnSchemasChange;
+ this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
this.messageTransformer = messageTransformer;
+ this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
this.salFacade = salFacade;
- this.sourceProviderFactory = sourceProviderFactory;
- this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
- this.schemaContextProviderFactory = schemaContextProviderFactory;
+ this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver();
+ this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor);
this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
}
@Override
- public void onRemoteSessionUp(final NetconfSessionCapabilities remoteSessionCapabilities,
- final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
+ final NetconfDeviceCommunicator listener) {
// SchemaContext setup has to be performed in a dedicated thread since
// we are in a netty thread in this method
// Yang models are being downloaded in this method and it would cause a
// http://netty.io/wiki/thread-model.html
logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
- final ListenableFuture<?> salInitializationFuture = processingExecutor.submit(new Runnable() {
+ final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(listener);
+
+ final DeviceSourcesResolver task = new DeviceSourcesResolver(deviceRpc, remoteSessionCapabilities, id, stateSchemasResolver);
+ final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(task);
+
+ if(shouldListenOnSchemaChange(remoteSessionCapabilities)) {
+ registerToBaseNetconfStream(deviceRpc, listener);
+ }
+
+ final FutureCallback<DeviceSources> resolvedSourceCallback = new FutureCallback<DeviceSources>() {
@Override
- public void run() {
- final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener);
- final SchemaSourceProvider<InputStream> delegate = sourceProviderFactory.createSourceProvider(deviceRpc);
- final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
- updateMessageTransformer(schemaContextProvider);
- salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
- notificationHandler.onRemoteSchemaUp();
+ public void onSuccess(final DeviceSources result) {
+ addProvidedSourcesToSchemaRegistry(deviceRpc, result);
+ setUpSchema(result);
}
- });
- Futures.addCallback(salInitializationFuture, new FutureCallback<Object>() {
+ private void setUpSchema(final DeviceSources result) {
+ processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, deviceRpc, listener));
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ logger.warn("{}: Unexpected error resolving device sources: {}", id, t);
+ handleSalInitializationFailure(t, listener);
+ }
+ };
+
+ Futures.addCallback(sourceResolverFuture, resolvedSourceCallback);
+
+ }
+
+ private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) {
+ final ListenableFuture<RpcResult<CompositeNode>> rpcResultListenableFuture =
+ deviceRpc.invokeRpc(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME, NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+
+ final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
+ @Override
+ public Optional<CompositeNode> filterNotification(final CompositeNode notification) {
+ if (isCapabilityChanged(notification)) {
+ logger.info("{}: Schemas change detected, reconnecting", id);
+ // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting
+ listener.disconnect();
+ return Optional.absent();
+ }
+ return Optional.of(notification);
+ }
+
+ private boolean isCapabilityChanged(final CompositeNode notification) {
+ return notification.getNodeType().equals(NetconfCapabilityChange.QNAME);
+ }
+ };
+
+ Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<CompositeNode>>() {
@Override
- public void onSuccess(final Object result) {
- logger.debug("{}: Initialization in sal successful", id);
- logger.info("{}: Netconf connector initialized successfully", id);
+ public void onSuccess(final RpcResult<CompositeNode> result) {
+ notificationHandler.addNotificationFilter(filter);
}
@Override
public void onFailure(final Throwable t) {
- // Unable to initialize device, set as disconnected
- logger.error("{}: Initialization failed", id, t);
- salFacade.onDeviceDisconnected();
- // TODO ssh connection is still open if sal initialization fails
+ logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t);
}
});
}
+ private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) {
+ return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
+ }
+
+ private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) {
+ updateMessageTransformer(result);
+ salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
+ notificationHandler.onRemoteSchemaUp();
+
+ logger.info("{}: Netconf connector initialized successfully", id);
+ }
+
+ private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ logger.error("{}: Initialization in sal failed, disconnecting from device", id, t);
+ listener.close();
+ onRemoteSessionDown();
+ resetMessageTransformer();
+ }
+
+ /**
+ * Set the schema context inside transformer to null as is in initial state
+ */
+ private void resetMessageTransformer() {
+ updateMessageTransformer(null);
+ }
+
/**
* Update initial message transformer to use retrieved schema
*/
- private void updateMessageTransformer(final SchemaContextProvider schemaContextProvider) {
- messageTransformer.onGlobalContextUpdated(schemaContextProvider.getSchemaContext());
+ private void updateMessageTransformer(final SchemaContext currentSchemaContext) {
+ messageTransformer.onGlobalContextUpdated(currentSchemaContext);
}
- private SchemaContextProvider setUpSchemaContext(final SchemaSourceProvider<InputStream> sourceProvider, final NetconfSessionCapabilities capabilities) {
- return schemaContextProviderFactory.createContextProvider(capabilities.getModuleBasedCaps(), sourceProvider);
+ private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) {
+ final NetconfRemoteSchemaYangSourceProvider yangProvider = new NetconfRemoteSchemaYangSourceProvider(id, deviceRpc);
+ for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) {
+ sourceRegistrations.add(schemaRegistry.registerSchemaSource(yangProvider,
+ PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+ }
}
- private NetconfDeviceRpc setUpDeviceRpc(final NetconfSessionCapabilities capHolder, final RemoteDeviceCommunicator<NetconfMessage> listener) {
- Preconditions.checkArgument(capHolder.isMonitoringSupported(),
- "%s: Netconf device does not support netconf monitoring, yang schemas cannot be acquired. Netconf device capabilities", capHolder);
- return new NetconfDeviceRpc(listener, messageTransformer);
+ private NetconfDeviceRpc setUpDeviceRpc(final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ return new NetconfDeviceRpc(listener, messageTransformer);
}
@Override
public void onRemoteSessionDown() {
+ notificationHandler.onRemoteSchemaDown();
+
salFacade.onDeviceDisconnected();
+ for (final SchemaSourceRegistration<? extends SchemaSourceRepresentation> sourceRegistration : sourceRegistrations) {
+ sourceRegistration.close();
+ }
+ resetMessageTransformer();
+ }
+
+ @Override
+ public void onRemoteSessionFailed(Throwable throwable) {
+ salFacade.onDeviceFailed(throwable);
}
@Override
}
/**
- * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
+ * Just a transfer object containing schema related dependencies. Injected in constructor.
*/
- private final static class NotificationHandler {
+ public static class SchemaResourcesDTO {
+ private final SchemaSourceRegistry schemaRegistry;
+ private final SchemaContextFactory schemaContextFactory;
+ private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
- private final RemoteDeviceHandler<?> salFacade;
- private final List<NetconfMessage> cache = new LinkedList<>();
- private final MessageTransformer<NetconfMessage> messageTransformer;
- private boolean passNotifications = false;
+ public SchemaResourcesDTO(final SchemaSourceRegistry schemaRegistry, final SchemaContextFactory schemaContextFactory, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
+ this.schemaRegistry = Preconditions.checkNotNull(schemaRegistry);
+ this.schemaContextFactory = Preconditions.checkNotNull(schemaContextFactory);
+ this.stateSchemasResolver = Preconditions.checkNotNull(stateSchemasResolver);
+ }
+
+ public SchemaSourceRegistry getSchemaRegistry() {
+ return schemaRegistry;
+ }
+
+ public SchemaContextFactory getSchemaContextFactory() {
+ return schemaContextFactory;
+ }
+
+ public NetconfStateSchemas.NetconfStateSchemasResolver getStateSchemasResolver() {
+ return stateSchemasResolver;
+ }
+ }
+
+ /**
+ * Schema building callable.
+ */
+ private static class DeviceSourcesResolver implements Callable<DeviceSources> {
+ private final NetconfDeviceRpc deviceRpc;
+ private final NetconfSessionPreferences remoteSessionCapabilities;
private final RemoteDeviceId id;
+ private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
- NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
- this.salFacade = salFacade;
- this.messageTransformer = messageTransformer;
+ public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
+ this.deviceRpc = deviceRpc;
+ this.remoteSessionCapabilities = remoteSessionCapabilities;
this.id = id;
+ this.stateSchemasResolver = stateSchemasResolver;
}
- synchronized void handleNotification(final NetconfMessage notification) {
- if(passNotifications) {
- passNotification(messageTransformer.toNotification(notification));
- } else {
- cacheNotification(notification);
+ @Override
+ public DeviceSources call() throws Exception {
+
+ final Set<SourceIdentifier> requiredSources = Sets.newHashSet(Collections2.transform(
+ remoteSessionCapabilities.getModuleBasedCaps(), QNAME_TO_SOURCE_ID_FUNCTION));
+
+ // If monitoring is not supported, we will still attempt to create schema, sources might be already provided
+ final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
+ logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
+
+ final Set<SourceIdentifier> providedSources = Sets.newHashSet(Collections2.transform(
+ availableSchemas.getAvailableYangSchemasQNames(), QNAME_TO_SOURCE_ID_FUNCTION));
+
+ final Set<SourceIdentifier> requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources);
+
+ if (!requiredSourcesNotProvided.isEmpty()) {
+ logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}",
+ id, requiredSourcesNotProvided);
+ logger.warn("{}: Attempting to build schema context from required sources", id);
}
+
+
+ // TODO should we perform this ? We have a mechanism to fix initialization of devices not reporting or required modules in hello
+ // That is overriding capabilities in configuration using attribute yang-module-capabilities
+ // This is more user friendly even though it clashes with attribute yang-module-capabilities
+ // Some devices do not report all required models in hello message, but provide them
+ final Set<SourceIdentifier> providedSourcesNotRequired = Sets.difference(providedSources, requiredSources);
+ if (!providedSourcesNotRequired.isEmpty()) {
+ logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}",
+ id, providedSourcesNotRequired);
+ logger.warn("{}: Adding provided but not required sources as required to prevent failures", id);
+ logger.debug("{}: Netconf device reported in hello: {}", id, requiredSources);
+ requiredSources.addAll(providedSourcesNotRequired);
+ }
+
+ return new DeviceSources(requiredSources, providedSources);
+ }
+ }
+
+ /**
+ * Contains RequiredSources - sources from capabilities.
+ *
+ */
+ private static final class DeviceSources {
+ private final Collection<SourceIdentifier> requiredSources;
+ private final Collection<SourceIdentifier> providedSources;
+
+ public DeviceSources(final Collection<SourceIdentifier> requiredSources, final Collection<SourceIdentifier> providedSources) {
+ this.requiredSources = requiredSources;
+ this.providedSources = providedSources;
+ }
+
+ public Collection<SourceIdentifier> getRequiredSources() {
+ return requiredSources;
+ }
+
+ public Collection<SourceIdentifier> getProvidedSources() {
+ return providedSources;
+ }
+
+ }
+
+ /**
+ * Schema builder that tries to build schema context from provided sources or biggest subset of it.
+ */
+ private final class RecursiveSchemaSetup implements Runnable {
+ private final DeviceSources deviceSources;
+ private final NetconfSessionPreferences remoteSessionCapabilities;
+ private final NetconfDeviceRpc deviceRpc;
+ private final RemoteDeviceCommunicator<NetconfMessage> listener;
+ private NetconfDeviceCapabilities capabilities;
+
+ public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ this.deviceSources = deviceSources;
+ this.remoteSessionCapabilities = remoteSessionCapabilities;
+ this.deviceRpc = deviceRpc;
+ this.listener = listener;
+ this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
+ }
+
+ @Override
+ public void run() {
+ setUpSchema(deviceSources.getRequiredSources());
}
/**
- * Forward all cached notifications and pass all notifications from this point directly to sal facade.
+ * Recursively build schema context, in case of success or final failure notify device
*/
- synchronized void onRemoteSchemaUp() {
- passNotifications = true;
+ // FIXME reimplement without recursion
+ private void setUpSchema(final Collection<SourceIdentifier> requiredSources) {
+ logger.trace("{}: Trying to build schema context from {}", id, requiredSources);
- for (final NetconfMessage cachedNotification : cache) {
- passNotification(messageTransformer.toNotification(cachedNotification));
+ // If no more sources, fail
+ if(requiredSources.isEmpty()) {
+ handleSalInitializationFailure(new IllegalStateException(id + ": No more sources for schema context"), listener);
+ return;
}
- cache.clear();
- }
+ final CheckedFuture<SchemaContext, SchemaResolutionException> schemaBuilderFuture = schemaContextFactory.createSchemaContext(requiredSources);
- private void cacheNotification(final NetconfMessage notification) {
- Preconditions.checkState(passNotifications == false);
+ final FutureCallback<SchemaContext> RecursiveSchemaBuilderCallback = new FutureCallback<SchemaContext>() {
- logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
- if(logger.isTraceEnabled()) {
- logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
- }
+ @Override
+ public void onSuccess(final SchemaContext result) {
+ logger.debug("{}: Schema context built successfully from {}", id, requiredSources);
+ Collection<QName> filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet());
+ capabilities.addCapabilities(filteredQNames);
+ capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps());
+ handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ // In case source missing, try without it
+ if (t instanceof MissingSchemaSourceException) {
+ final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId();
+ logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource);
+ capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), UnavailableCapability.FailureReason.MissingSource);
+ setUpSchema(stripMissingSource(requiredSources, missingSource));
- cache.add(notification);
+ // In case resolution error, try only with resolved sources
+ } else if (t instanceof SchemaResolutionException) {
+ // TODO check for infinite loop
+ final SchemaResolutionException resolutionException = (SchemaResolutionException) t;
+ final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
+ capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve);
+ logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports());
+ setUpSchema(resolutionException.getResolvedSources());
+ // unknown error, fail
+ } else {
+ handleSalInitializationFailure(t, listener);
+ }
+ }
+ };
+
+ Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback);
}
- private void passNotification(final CompositeNode parsedNotification) {
- logger.debug("{}: Forwarding notification {}", id, parsedNotification);
- Preconditions.checkNotNull(parsedNotification);
- salFacade.onNotification(parsedNotification);
+ private Collection<SourceIdentifier> stripMissingSource(final Collection<SourceIdentifier> requiredSources, final SourceIdentifier sIdToRemove) {
+ final LinkedList<SourceIdentifier> sourceIdentifiers = Lists.newLinkedList(requiredSources);
+ final boolean removed = sourceIdentifiers.remove(sIdToRemove);
+ Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources);
+ return sourceIdentifiers;
}
+ private Collection<QName> getQNameFromSourceIdentifiers(Collection<SourceIdentifier> identifiers) {
+ Collection<QName> qNames = new HashSet<>();
+ for (SourceIdentifier source : identifiers) {
+ Optional<QName> qname = getQNameFromSourceIdentifier(source);
+ if (qname.isPresent()) {
+ qNames.add(qname.get());
+ }
+ }
+ if (qNames.isEmpty()) {
+ logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers);
+ }
+ return qNames;
+ }
+
+ private Optional<QName> getQNameFromSourceIdentifier(SourceIdentifier identifier) {
+ for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) {
+ if (qname.getLocalName().equals(identifier.getName())
+ && qname.getFormattedRevision().equals(identifier.getRevision())) {
+ return Optional.of(qname);
+ }
+ }
+ throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier);
+ }
}
}