X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.java;h=ac84acb2f177a16d3aa110f54ee4bcc692f80664;hp=39340fa16630d63ffceb08061ce151431b5fab8b;hb=06e889c9c78457590b6a0b62d89a6b9f44242a9f;hpb=6b9ec89c77e614b44dbd01de77a45ee8d9e6d0ec diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index 39340fa166..ac84acb2f1 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.sal.connect.netconf; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -26,18 +27,28 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.sal.connect.api.MessageTransformer; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider; +import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer; +import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.$YangModuleInfoImpl; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability; +import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext; import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException; import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; @@ -54,10 +65,30 @@ import org.slf4j.LoggerFactory; /** * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade */ -public final class NetconfDevice implements RemoteDevice { +public final class NetconfDevice implements RemoteDevice { private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class); + /** + * Initial schema context contains schemas for netconf monitoring and netconf notifications + */ + public static final SchemaContext INIT_SCHEMA_CTX; + + static { + try { + final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create(); + moduleInfoBackedContext.addModuleInfos( + Lists.newArrayList( + $YangModuleInfoImpl.getInstance(), + org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.$YangModuleInfoImpl.getInstance(), + org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.$YangModuleInfoImpl.getInstance())); + INIT_SCHEMA_CTX = moduleInfoBackedContext.tryToCreateSchemaContext().get(); + } catch (final RuntimeException e) { + logger.error("Unable to prepare schema context for netconf initialization", e); + throw new ExceptionInInitializerError(e); + } + } + public static final Function QNAME_TO_SOURCE_ID_FUNCTION = new Function() { @Override public SourceIdentifier apply(final QName input) { @@ -66,31 +97,48 @@ public final class NetconfDevice implements RemoteDevice salFacade; private final ListeningExecutorService processingExecutor; private final SchemaSourceRegistry schemaRegistry; - private final MessageTransformer messageTransformer; private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; private final NotificationHandler notificationHandler; private final List> sourceRegistrations = Lists.newArrayList(); + // Message transformer is constructed once the schemas are available + private MessageTransformer messageTransformer; + + public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ExecutorService globalProcessingExecutor) { + this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, false); + } + + /** + * Create rpc implementation capable of handling RPC for monitoring and notifications even before the schemas of remote device are downloaded + */ + static NetconfDeviceRpc getRpcForInitialization(final NetconfDeviceCommunicator listener) { + return new NetconfDeviceRpc(INIT_SCHEMA_CTX, listener, new NetconfMessageTransformer(INIT_SCHEMA_CTX)); + } + + + // FIXME reduce parameters public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ExecutorService globalProcessingExecutor, final MessageTransformer messageTransformer) { + final ExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) { this.id = id; + this.reconnectOnSchemasChange = reconnectOnSchemasChange; this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry(); - this.messageTransformer = messageTransformer; this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory(); this.salFacade = salFacade; this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver(); this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor); - this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id); + this.notificationHandler = new NotificationHandler(salFacade, id); } @Override public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, - final RemoteDeviceCommunicator listener) { + final NetconfDeviceCommunicator listener) { // SchemaContext setup has to be performed in a dedicated thread since // we are in a netty thread in this method // Yang models are being downloaded in this method and it would cause a @@ -98,20 +146,23 @@ public final class NetconfDevice implements RemoteDevice sourceResolverFuture = processingExecutor.submit(task); + if(shouldListenOnSchemaChange(remoteSessionCapabilities)) { + registerToBaseNetconfStream(initRpc, listener); + } + final FutureCallback resolvedSourceCallback = new FutureCallback() { @Override public void onSuccess(final DeviceSources result) { - addProvidedSourcesToSchemaRegistry(deviceRpc, result); + addProvidedSourcesToSchemaRegistry(initRpc, result); setUpSchema(result); } private void setUpSchema(final DeviceSources result) { - processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, deviceRpc, listener)); + processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, listener)); } @Override @@ -122,15 +173,56 @@ public final class NetconfDevice implements RemoteDevice rpcResultListenableFuture = + deviceRpc.invokeRpc(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME), NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT); + + final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() { + @Override + public Optional> filterNotification(final NormalizedNode notification) { + if (isCapabilityChanged(notification)) { + logger.info("{}: Schemas change detected, reconnecting", id); + // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting + listener.disconnect(); + return Optional.absent(); + } + return Optional.>of(notification); + } + + private boolean isCapabilityChanged(final NormalizedNode notification) { + return notification.getNodeType().equals(NetconfCapabilityChange.QNAME); + } + }; + + Futures.addCallback(rpcResultListenableFuture, new FutureCallback() { + @Override + public void onSuccess(final DOMRpcResult domRpcResult) { + notificationHandler.addNotificationFilter(filter); + } + + @Override + public void onFailure(final Throwable t) { + logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t); + } + }); + } + + private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) { + return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange; } - private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) { - updateMessageTransformer(result); + @VisibleForTesting + void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) { + messageTransformer = new NetconfMessageTransformer(result); + + updateTransformer(messageTransformer); + notificationHandler.onRemoteSchemaUp(messageTransformer); salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc); - notificationHandler.onRemoteSchemaUp(); - logger.debug("{}: Initialization in sal successful", id); logger.info("{}: Netconf connector initialized successfully", id); } @@ -142,18 +234,14 @@ public final class NetconfDevice implements RemoteDevice transformer) { + messageTransformer = transformer; } private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) { @@ -164,12 +252,10 @@ public final class NetconfDevice implements RemoteDevice listener) { - return new NetconfDeviceRpc(listener, messageTransformer); - } - @Override public void onRemoteSessionDown() { + notificationHandler.onRemoteSchemaDown(); + salFacade.onDeviceDisconnected(); for (final SchemaSourceRegistration sourceRegistration : sourceRegistrations) { sourceRegistration.close(); @@ -178,7 +264,7 @@ public final class NetconfDevice implements RemoteDevice { + private final NetconfDeviceRpc deviceRpc; private final NetconfSessionPreferences remoteSessionCapabilities; private final RemoteDeviceId id; private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; - public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { + DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, + final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { this.deviceRpc = deviceRpc; this.remoteSessionCapabilities = remoteSessionCapabilities; this.id = id; this.stateSchemasResolver = stateSchemasResolver; } + public DeviceSourcesResolver(final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver, final NetconfDeviceRpc rpcForMonitoring) { + this(rpcForMonitoring, remoteSessionCapabilities, id, stateSchemasResolver); + } + @Override public DeviceSources call() throws Exception { @@ -252,15 +344,16 @@ public final class NetconfDevice implements RemoteDevice providedSourcesNotRequired = Sets.difference(providedSources, requiredSources); if (!providedSourcesNotRequired.isEmpty()) { logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}", id, providedSourcesNotRequired); logger.warn("{}: Adding provided but not required sources as required to prevent failures", id); + logger.debug("{}: Netconf device reported in hello: {}", id, requiredSources); requiredSources.addAll(providedSourcesNotRequired); } @@ -270,7 +363,6 @@ public final class NetconfDevice implements RemoteDevice requiredSources; @@ -297,14 +389,12 @@ public final class NetconfDevice implements RemoteDevice listener; - private NetconfDeviceCapabilities capabilities; + private final NetconfDeviceCapabilities capabilities; - public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator listener) { + public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceCommunicator listener) { this.deviceSources = deviceSources; this.remoteSessionCapabilities = remoteSessionCapabilities; - this.deviceRpc = deviceRpc; this.listener = listener; this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities(); } @@ -334,10 +424,10 @@ public final class NetconfDevice implements RemoteDevice filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet()); + final Collection filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet()); capabilities.addCapabilities(filteredQNames); capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps()); - handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc); + handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result)); } @Override @@ -346,7 +436,7 @@ public final class NetconfDevice implements RemoteDevice unresolvedSources = resolutionException.getUnsatisfiedImports().keySet(); - capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), FailureReason.UnableToResolve); + capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve); logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports()); setUpSchema(resolutionException.getResolvedSources()); // unknown error, fail @@ -367,6 +457,10 @@ public final class NetconfDevice implements RemoteDevice stripMissingSource(final Collection requiredSources, final SourceIdentifier sIdToRemove) { final LinkedList sourceIdentifiers = Lists.newLinkedList(requiredSources); final boolean removed = sourceIdentifiers.remove(sIdToRemove); @@ -374,10 +468,10 @@ public final class NetconfDevice implements RemoteDevice getQNameFromSourceIdentifiers(Collection identifiers) { - Collection qNames = new HashSet<>(); - for (SourceIdentifier source : identifiers) { - Optional qname = getQNameFromSourceIdentifier(source); + private Collection getQNameFromSourceIdentifiers(final Collection identifiers) { + final Collection qNames = new HashSet<>(); + for (final SourceIdentifier source : identifiers) { + final Optional qname = getQNameFromSourceIdentifier(source); if (qname.isPresent()) { qNames.add(qname.get()); } @@ -388,8 +482,8 @@ public final class NetconfDevice implements RemoteDevice getQNameFromSourceIdentifier(SourceIdentifier identifier) { - for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) { + private Optional getQNameFromSourceIdentifier(final SourceIdentifier identifier) { + for (final QName qname : remoteSessionCapabilities.getModuleBasedCaps()) { if (qname.getLocalName().equals(identifier.getName()) && qname.getFormattedRevision().equals(identifier.getRevision())) { return Optional.of(qname);