X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.java;h=b57a8912ccd1fe67875f6b6d37e571259b1c23f1;hb=8ce853c0627e829d40fe18e550bc807efbcbafee;hp=cc9eb5a851271c8ed221d94038b9db5e35f92059;hpb=971b179000ef1cc56699de35061cf6f97d4cf36f;p=controller.git diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index cc9eb5a851..b57a8912cc 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.sal.connect.netconf; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -20,21 +21,34 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.util.Collection; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.sal.connect.api.MessageTransformer; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider; +import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer; +import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.$YangModuleInfoImpl; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability; +import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext; import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException; import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; @@ -51,10 +65,30 @@ import org.slf4j.LoggerFactory; /** * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade */ -public final class NetconfDevice implements RemoteDevice { +public final class NetconfDevice implements RemoteDevice { private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class); + /** + * Initial schema context contains schemas for netconf monitoring and netconf notifications + */ + public static final SchemaContext INIT_SCHEMA_CTX; + + static { + try { + final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create(); + moduleInfoBackedContext.addModuleInfos( + Lists.newArrayList( + $YangModuleInfoImpl.getInstance(), + org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.$YangModuleInfoImpl.getInstance(), + org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.$YangModuleInfoImpl.getInstance())); + INIT_SCHEMA_CTX = moduleInfoBackedContext.tryToCreateSchemaContext().get(); + } catch (final RuntimeException e) { + logger.error("Unable to prepare schema context for netconf initialization", e); + throw new ExceptionInInitializerError(e); + } + } + public static final Function QNAME_TO_SOURCE_ID_FUNCTION = new Function() { @Override public SourceIdentifier apply(final QName input) { @@ -63,31 +97,48 @@ public final class NetconfDevice implements RemoteDevice salFacade; + private final RemoteDeviceHandler salFacade; private final ListeningExecutorService processingExecutor; private final SchemaSourceRegistry schemaRegistry; - private final MessageTransformer messageTransformer; private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; private final NotificationHandler notificationHandler; private final List> sourceRegistrations = Lists.newArrayList(); - public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ExecutorService globalProcessingExecutor, final MessageTransformer messageTransformer) { + // Message transformer is constructed once the schemas are available + private MessageTransformer messageTransformer; + + public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ExecutorService globalProcessingExecutor) { + this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, false); + } + + /** + * Create rpc implementation capable of handling RPC for monitoring and notifications even before the schemas of remote device are downloaded + */ + static NetconfDeviceRpc getRpcForInitialization(final NetconfDeviceCommunicator listener) { + return new NetconfDeviceRpc(INIT_SCHEMA_CTX, listener, new NetconfMessageTransformer(INIT_SCHEMA_CTX)); + } + + + // FIXME reduce parameters + public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) { this.id = id; + this.reconnectOnSchemasChange = reconnectOnSchemasChange; this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry(); - this.messageTransformer = messageTransformer; this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory(); this.salFacade = salFacade; this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver(); this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor); - this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id); + this.notificationHandler = new NotificationHandler(salFacade, id); } @Override - public void onRemoteSessionUp(final NetconfSessionCapabilities remoteSessionCapabilities, - final RemoteDeviceCommunicator listener) { + public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, + final NetconfDeviceCommunicator listener) { // SchemaContext setup has to be performed in a dedicated thread since // we are in a netty thread in this method // Yang models are being downloaded in this method and it would cause a @@ -95,20 +146,23 @@ public final class NetconfDevice implements RemoteDevice sourceResolverFuture = processingExecutor.submit(task); + if(shouldListenOnSchemaChange(remoteSessionCapabilities)) { + registerToBaseNetconfStream(initRpc, listener); + } + final FutureCallback resolvedSourceCallback = new FutureCallback() { @Override public void onSuccess(final DeviceSources result) { - addProvidedSourcesToSchemaRegistry(deviceRpc, result); + addProvidedSourcesToSchemaRegistry(initRpc, result); setUpSchema(result); } private void setUpSchema(final DeviceSources result) { - processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, deviceRpc, listener)); + processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, listener)); } @Override @@ -121,12 +175,52 @@ public final class NetconfDevice implements RemoteDevice rpcResultListenableFuture = + deviceRpc.invokeRpc(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME), NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT); + + final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() { + @Override + public Optional> filterNotification(final NormalizedNode notification) { + if (isCapabilityChanged(notification)) { + logger.info("{}: Schemas change detected, reconnecting", id); + // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting + listener.disconnect(); + return Optional.absent(); + } + return Optional.>of(notification); + } + + private boolean isCapabilityChanged(final NormalizedNode notification) { + return notification.getNodeType().equals(NetconfCapabilityChange.QNAME); + } + }; + + Futures.addCallback(rpcResultListenableFuture, new FutureCallback() { + @Override + public void onSuccess(final DOMRpcResult domRpcResult) { + notificationHandler.addNotificationFilter(filter); + } + + @Override + public void onFailure(final Throwable t) { + logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t); + } + }); + } + + private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) { + return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange; + } + + @VisibleForTesting + void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) { + messageTransformer = new NetconfMessageTransformer(result); + + updateTransformer(messageTransformer); + notificationHandler.onRemoteSchemaUp(messageTransformer); salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc); - notificationHandler.onRemoteSchemaUp(); - logger.debug("{}: Initialization in sal successful", id); logger.info("{}: Netconf connector initialized successfully", id); } @@ -134,14 +228,18 @@ public final class NetconfDevice implements RemoteDevice transformer) { + messageTransformer = transformer; } private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) { @@ -152,16 +250,20 @@ public final class NetconfDevice implements RemoteDevice listener) { - return new NetconfDeviceRpc(listener, messageTransformer); - } - @Override public void onRemoteSessionDown() { + notificationHandler.onRemoteSchemaDown(); + salFacade.onDeviceDisconnected(); for (final SchemaSourceRegistration sourceRegistration : sourceRegistrations) { sourceRegistration.close(); } + resetMessageTransformer(); + } + + @Override + public void onRemoteSessionFailed(final Throwable throwable) { + salFacade.onDeviceFailed(throwable); } @Override @@ -200,18 +302,24 @@ public final class NetconfDevice implements RemoteDevice { + private final NetconfDeviceRpc deviceRpc; - private final NetconfSessionCapabilities remoteSessionCapabilities; + private final NetconfSessionPreferences remoteSessionCapabilities; private final RemoteDeviceId id; private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; - public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { + DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, + final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { this.deviceRpc = deviceRpc; this.remoteSessionCapabilities = remoteSessionCapabilities; this.id = id; this.stateSchemasResolver = stateSchemasResolver; } + public DeviceSourcesResolver(final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver, final NetconfDeviceRpc rpcForMonitoring) { + this(rpcForMonitoring, remoteSessionCapabilities, id, stateSchemasResolver); + } + @Override public DeviceSources call() throws Exception { @@ -234,15 +342,16 @@ public final class NetconfDevice implements RemoteDevice providedSourcesNotRequired = Sets.difference(providedSources, requiredSources); if (!providedSourcesNotRequired.isEmpty()) { logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}", id, providedSourcesNotRequired); logger.warn("{}: Adding provided but not required sources as required to prevent failures", id); + logger.debug("{}: Netconf device reported in hello: {}", id, requiredSources); requiredSources.addAll(providedSourcesNotRequired); } @@ -252,7 +361,6 @@ public final class NetconfDevice implements RemoteDevice requiredSources; @@ -278,15 +386,15 @@ public final class NetconfDevice implements RemoteDevice listener; + private final NetconfDeviceCapabilities capabilities; - public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionCapabilities remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator listener) { + public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceCommunicator listener) { this.deviceSources = deviceSources; this.remoteSessionCapabilities = remoteSessionCapabilities; - this.deviceRpc = deviceRpc; this.listener = listener; + this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities(); } @Override @@ -297,6 +405,7 @@ public final class NetconfDevice implements RemoteDevice requiredSources) { logger.trace("{}: Trying to build schema context from {}", id, requiredSources); @@ -313,7 +422,10 @@ public final class NetconfDevice implements RemoteDevice filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet()); + capabilities.addCapabilities(filteredQNames); + capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps()); + handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result)); } @Override @@ -322,12 +434,15 @@ public final class NetconfDevice implements RemoteDevice unresolvedSources = resolutionException.getUnsatisfiedImports().keySet(); + capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve); logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports()); setUpSchema(resolutionException.getResolvedSources()); // unknown error, fail @@ -340,11 +455,39 @@ public final class NetconfDevice implements RemoteDevice stripMissingSource(final Collection requiredSources, final SourceIdentifier sIdToRemove) { final LinkedList sourceIdentifiers = Lists.newLinkedList(requiredSources); final boolean removed = sourceIdentifiers.remove(sIdToRemove); Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources); return sourceIdentifiers; } + + private Collection getQNameFromSourceIdentifiers(final Collection identifiers) { + final Collection qNames = new HashSet<>(); + for (final SourceIdentifier source : identifiers) { + final Optional qname = getQNameFromSourceIdentifier(source); + if (qname.isPresent()) { + qNames.add(qname.get()); + } + } + if (qNames.isEmpty()) { + logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers); + } + return qNames; + } + + private Optional getQNameFromSourceIdentifier(final SourceIdentifier identifier) { + for (final QName qname : remoteSessionCapabilities.getModuleBasedCaps()) { + if (qname.getLocalName().equals(identifier.getName()) + && qname.getFormattedRevision().equals(identifier.getRevision())) { + return Optional.of(qname); + } + } + throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier); + } } }