*/
package org.opendaylight.controller.sal.connect.netconf;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.sal.connect.api.MessageTransformer;
import org.opendaylight.controller.sal.connect.api.RemoteDevice;
import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider;
+import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.$YangModuleInfoImpl;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability;
+import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
+ /**
+ * Initial schema context contains schemas for netconf monitoring and netconf notifications
+ */
+ public static final SchemaContext INIT_SCHEMA_CTX;
+
+ static {
+ try {
+ final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
+ moduleInfoBackedContext.addModuleInfos(
+ Lists.newArrayList(
+ $YangModuleInfoImpl.getInstance(),
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.$YangModuleInfoImpl.getInstance(),
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.$YangModuleInfoImpl.getInstance()));
+ INIT_SCHEMA_CTX = moduleInfoBackedContext.tryToCreateSchemaContext().get();
+ } catch (final RuntimeException e) {
+ logger.error("Unable to prepare schema context for netconf initialization", e);
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
public static final Function<QName, SourceIdentifier> QNAME_TO_SOURCE_ID_FUNCTION = new Function<QName, SourceIdentifier>() {
@Override
public SourceIdentifier apply(final QName input) {
private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
private final ListeningExecutorService processingExecutor;
private final SchemaSourceRegistry schemaRegistry;
- private final MessageTransformer<NetconfMessage> messageTransformer;
private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
private final NotificationHandler notificationHandler;
private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
+ // Message transformer is constructed once the schemas are available
+ private MessageTransformer<NetconfMessage> messageTransformer;
+
public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer) {
- this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, messageTransformer, false);
+ final ExecutorService globalProcessingExecutor) {
+ this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, false);
}
+ /**
+ * Create rpc implementation capable of handling RPC for monitoring and notifications even before the schemas of remote device are downloaded
+ */
+ static NetconfDeviceRpc getRpcForInitialization(final NetconfDeviceCommunicator listener) {
+ return new NetconfDeviceRpc(INIT_SCHEMA_CTX, listener, new NetconfMessageTransformer(INIT_SCHEMA_CTX, false));
+ }
+
+
// FIXME reduce parameters
public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer, final boolean reconnectOnSchemasChange) {
+ final ExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) {
this.id = id;
this.reconnectOnSchemasChange = reconnectOnSchemasChange;
this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
- this.messageTransformer = messageTransformer;
this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
this.salFacade = salFacade;
this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver();
this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor);
- this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
+ this.notificationHandler = new NotificationHandler(salFacade, id);
}
@Override
// http://netty.io/wiki/thread-model.html
logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
- final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(listener);
-
- final DeviceSourcesResolver task = new DeviceSourcesResolver(deviceRpc, remoteSessionCapabilities, id, stateSchemasResolver);
+ final NetconfDeviceRpc initRpc = getRpcForInitialization(listener);
+ final DeviceSourcesResolver task = new DeviceSourcesResolver(remoteSessionCapabilities, id, stateSchemasResolver, initRpc);
final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(task);
if(shouldListenOnSchemaChange(remoteSessionCapabilities)) {
- registerToBaseNetconfStream(deviceRpc, listener);
+ registerToBaseNetconfStream(initRpc, listener);
}
final FutureCallback<DeviceSources> resolvedSourceCallback = new FutureCallback<DeviceSources>() {
@Override
public void onSuccess(final DeviceSources result) {
- addProvidedSourcesToSchemaRegistry(deviceRpc, result);
+ addProvidedSourcesToSchemaRegistry(initRpc, result);
setUpSchema(result);
}
private void setUpSchema(final DeviceSources result) {
- processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, deviceRpc, listener));
+ processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, listener));
}
@Override
};
Futures.addCallback(sourceResolverFuture, resolvedSourceCallback);
-
}
private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) {
- final ListenableFuture<RpcResult<CompositeNode>> rpcResultListenableFuture =
- deviceRpc.invokeRpc(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME, NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+ // TODO check whether the model describing create subscription is present in schema
+ // Perhaps add a default schema context to support create-subscription if the model was not provided (same as what we do for base netconf operations in transformer)
+ final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultListenableFuture =
+ deviceRpc.invokeRpc(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME), NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
@Override
- public Optional<CompositeNode> filterNotification(final CompositeNode notification) {
+ public Optional<NormalizedNode<?, ?>> filterNotification(final NormalizedNode<?, ?> notification) {
if (isCapabilityChanged(notification)) {
logger.info("{}: Schemas change detected, reconnecting", id);
// Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting
listener.disconnect();
return Optional.absent();
}
- return Optional.of(notification);
+ return Optional.<NormalizedNode<?, ?>>of(notification);
}
- private boolean isCapabilityChanged(final CompositeNode notification) {
+ private boolean isCapabilityChanged(final NormalizedNode<?, ?> notification) {
return notification.getNodeType().equals(NetconfCapabilityChange.QNAME);
}
};
- Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<CompositeNode>>() {
+ Futures.addCallback(rpcResultListenableFuture, new FutureCallback<DOMRpcResult>() {
@Override
- public void onSuccess(final RpcResult<CompositeNode> result) {
+ public void onSuccess(final DOMRpcResult domRpcResult) {
notificationHandler.addNotificationFilter(filter);
}
return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
}
- private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) {
- updateMessageTransformer(result);
+ @VisibleForTesting
+ void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
+ messageTransformer = new NetconfMessageTransformer(result, true);
+
+ updateTransformer(messageTransformer);
+ // salFacade.onDeviceConnected has to be called before the notification handler is initialized
salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
- notificationHandler.onRemoteSchemaUp();
+ notificationHandler.onRemoteSchemaUp(messageTransformer);
logger.info("{}: Netconf connector initialized successfully", id);
}
}
/**
- * Set the schema context inside transformer to null as is in initial state
+ * Set the transformer to null as is in initial state
*/
private void resetMessageTransformer() {
- updateMessageTransformer(null);
+ updateTransformer(null);
}
- /**
- * Update initial message transformer to use retrieved schema
- */
- private void updateMessageTransformer(final SchemaContext currentSchemaContext) {
- messageTransformer.onGlobalContextUpdated(currentSchemaContext);
+ private void updateTransformer(final MessageTransformer<NetconfMessage> transformer) {
+ messageTransformer = transformer;
}
private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) {
}
}
- private NetconfDeviceRpc setUpDeviceRpc(final RemoteDeviceCommunicator<NetconfMessage> listener) {
- return new NetconfDeviceRpc(listener, messageTransformer);
- }
-
@Override
public void onRemoteSessionDown() {
+ notificationHandler.onRemoteSchemaDown();
+
salFacade.onDeviceDisconnected();
for (final SchemaSourceRegistration<? extends SchemaSourceRepresentation> sourceRegistration : sourceRegistrations) {
sourceRegistration.close();
}
@Override
- public void onRemoteSessionFailed(Throwable throwable) {
+ public void onRemoteSessionFailed(final Throwable throwable) {
salFacade.onDeviceFailed(throwable);
}
* Schema building callable.
*/
private static class DeviceSourcesResolver implements Callable<DeviceSources> {
+
private final NetconfDeviceRpc deviceRpc;
private final NetconfSessionPreferences remoteSessionCapabilities;
private final RemoteDeviceId id;
private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
- public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
+ DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities,
+ final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
this.deviceRpc = deviceRpc;
this.remoteSessionCapabilities = remoteSessionCapabilities;
this.id = id;
this.stateSchemasResolver = stateSchemasResolver;
}
+ public DeviceSourcesResolver(final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver, final NetconfDeviceRpc rpcForMonitoring) {
+ this(rpcForMonitoring, remoteSessionCapabilities, id, stateSchemasResolver);
+ }
+
@Override
public DeviceSources call() throws Exception {
-
- final Set<SourceIdentifier> requiredSources = Sets.newHashSet(Collections2.transform(
- remoteSessionCapabilities.getModuleBasedCaps(), QNAME_TO_SOURCE_ID_FUNCTION));
-
- // If monitoring is not supported, we will still attempt to create schema, sources might be already provided
final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
- final Set<SourceIdentifier> providedSources = Sets.newHashSet(Collections2.transform(
- availableSchemas.getAvailableYangSchemasQNames(), QNAME_TO_SOURCE_ID_FUNCTION));
-
- final Set<SourceIdentifier> requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources);
+ final Set<QName> requiredSources = Sets.newHashSet(remoteSessionCapabilities.getModuleBasedCaps());
+ final Set<QName> providedSources = availableSchemas.getAvailableYangSchemasQNames();
+ final Set<QName> requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources);
if (!requiredSourcesNotProvided.isEmpty()) {
logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}",
id, requiredSourcesNotProvided);
logger.warn("{}: Attempting to build schema context from required sources", id);
}
-
- // TODO should we perform this ? We have a mechanism to fix initialization of devices not reporting or required modules in hello
- // That is overriding capabilities in configuration using attribute yang-module-capabilities
- // This is more user friendly even though it clashes with attribute yang-module-capabilities
- // Some devices do not report all required models in hello message, but provide them
- final Set<SourceIdentifier> providedSourcesNotRequired = Sets.difference(providedSources, requiredSources);
+ // Here all the sources reported in netconf monitoring are merged with those reported in hello.
+ // It is necessary to perform this since submodules are not mentioned in hello but still required.
+ // This clashes with the option of a user to specify supported yang models manually in configuration for netconf-connector
+ // and as a result one is not able to fully override yang models of a device. It is only possible to add additional models.
+ final Set<QName> providedSourcesNotRequired = Sets.difference(providedSources, requiredSources);
if (!providedSourcesNotRequired.isEmpty()) {
logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}",
id, providedSourcesNotRequired);
logger.warn("{}: Adding provided but not required sources as required to prevent failures", id);
+ logger.debug("{}: Netconf device reported in hello: {}", id, requiredSources);
requiredSources.addAll(providedSourcesNotRequired);
}
/**
* Contains RequiredSources - sources from capabilities.
- *
*/
private static final class DeviceSources {
- private final Collection<SourceIdentifier> requiredSources;
- private final Collection<SourceIdentifier> providedSources;
+ private final Set<QName> requiredSources;
+ private final Set<QName> providedSources;
- public DeviceSources(final Collection<SourceIdentifier> requiredSources, final Collection<SourceIdentifier> providedSources) {
+ public DeviceSources(final Set<QName> requiredSources, final Set<QName> providedSources) {
this.requiredSources = requiredSources;
this.providedSources = providedSources;
}
- public Collection<SourceIdentifier> getRequiredSources() {
+ public Set<QName> getRequiredSourcesQName() {
return requiredSources;
}
- public Collection<SourceIdentifier> getProvidedSources() {
+ public Set<QName> getProvidedSourcesQName() {
return providedSources;
}
+ public Collection<SourceIdentifier> getRequiredSources() {
+ return Collections2.transform(requiredSources, QNAME_TO_SOURCE_ID_FUNCTION);
+ }
+
+ public Collection<SourceIdentifier> getProvidedSources() {
+ return Collections2.transform(providedSources, QNAME_TO_SOURCE_ID_FUNCTION);
+ }
+
}
/**
private final class RecursiveSchemaSetup implements Runnable {
private final DeviceSources deviceSources;
private final NetconfSessionPreferences remoteSessionCapabilities;
- private final NetconfDeviceRpc deviceRpc;
private final RemoteDeviceCommunicator<NetconfMessage> listener;
- private NetconfDeviceCapabilities capabilities;
+ private final NetconfDeviceCapabilities capabilities;
- public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceCommunicator<NetconfMessage> listener) {
this.deviceSources = deviceSources;
this.remoteSessionCapabilities = remoteSessionCapabilities;
- this.deviceRpc = deviceRpc;
this.listener = listener;
this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
}
// If no more sources, fail
if(requiredSources.isEmpty()) {
- handleSalInitializationFailure(new IllegalStateException(id + ": No more sources for schema context"), listener);
+ final IllegalStateException cause = new IllegalStateException(id + ": No more sources for schema context");
+ handleSalInitializationFailure(cause, listener);
+ salFacade.onDeviceFailed(cause);
return;
}
@Override
public void onSuccess(final SchemaContext result) {
logger.debug("{}: Schema context built successfully from {}", id, requiredSources);
- Collection<QName> filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet());
+ final Collection<QName> filteredQNames = Sets.difference(deviceSources.getProvidedSourcesQName(), capabilities.getUnresolvedCapabilites().keySet());
capabilities.addCapabilities(filteredQNames);
capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps());
- handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
+ handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result));
}
@Override
Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback);
}
+ private NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
+ return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
+ }
+
private Collection<SourceIdentifier> stripMissingSource(final Collection<SourceIdentifier> requiredSources, final SourceIdentifier sIdToRemove) {
final LinkedList<SourceIdentifier> sourceIdentifiers = Lists.newLinkedList(requiredSources);
final boolean removed = sourceIdentifiers.remove(sIdToRemove);
return sourceIdentifiers;
}
- private Collection<QName> getQNameFromSourceIdentifiers(Collection<SourceIdentifier> identifiers) {
- Collection<QName> qNames = new HashSet<>();
- for (SourceIdentifier source : identifiers) {
- Optional<QName> qname = getQNameFromSourceIdentifier(source);
- if (qname.isPresent()) {
- qNames.add(qname.get());
+ private Collection<QName> getQNameFromSourceIdentifiers(final Collection<SourceIdentifier> identifiers) {
+ final Collection<QName> qNames = Collections2.transform(identifiers, new Function<SourceIdentifier, QName>() {
+ @Override
+ public QName apply(final SourceIdentifier sourceIdentifier) {
+ return getQNameFromSourceIdentifier(sourceIdentifier);
}
- }
+ });
+
if (qNames.isEmpty()) {
logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers);
}
return qNames;
}
- private Optional<QName> getQNameFromSourceIdentifier(SourceIdentifier identifier) {
- for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) {
- if (qname.getLocalName().equals(identifier.getName())
- && qname.getFormattedRevision().equals(identifier.getRevision())) {
- return Optional.of(qname);
+ private QName getQNameFromSourceIdentifier(final SourceIdentifier identifier) {
+ // Required sources are all required and provided merged in DeviceSourcesResolver
+ for (final QName qname : deviceSources.getRequiredSourcesQName()) {
+ if(qname.getLocalName().equals(identifier.getName()) == false) {
+ continue;
+ }
+
+ if(identifier.getRevision().equals(SourceIdentifier.NOT_PRESENT_FORMATTED_REVISION) &&
+ qname.getRevision() == null) {
+ return qname;
+ }
+
+ if (qname.getFormattedRevision().equals(identifier.getRevision())) {
+ return qname;
}
}
- throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier);
+ throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier + " Available: " + deviceSources.getRequiredSourcesQName());
}
}
}