X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.java;h=b57a8912ccd1fe67875f6b6d37e571259b1c23f1;hp=281f82ba2aa04d9085d9127e4aeb7ad10cb8d400;hb=8ce853c0627e829d40fe18e550bc807efbcbafee;hpb=cccc148d3cd2b5a2a7c86da2e74f2ea43fa00b93 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index 281f82ba2a..b57a8912cc 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.sal.connect.netconf; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -26,6 +27,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.sal.connect.api.MessageTransformer; import org.opendaylight.controller.sal.connect.api.RemoteDevice; @@ -36,13 +40,15 @@ import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCom import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider; +import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer; import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.$YangModuleInfoImpl; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability; +import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException; import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; @@ -63,6 +69,26 @@ public final class NetconfDevice implements RemoteDevice QNAME_TO_SOURCE_ID_FUNCTION = new Function() { @Override public SourceIdentifier apply(final QName input) { @@ -77,28 +103,37 @@ public final class NetconfDevice implements RemoteDevice salFacade; private final ListeningExecutorService processingExecutor; private final SchemaSourceRegistry schemaRegistry; - private final MessageTransformer messageTransformer; private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; private final NotificationHandler notificationHandler; private final List> sourceRegistrations = Lists.newArrayList(); + // Message transformer is constructed once the schemas are available + private MessageTransformer messageTransformer; + public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ExecutorService globalProcessingExecutor, final MessageTransformer messageTransformer) { - this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, messageTransformer, false); + final ExecutorService globalProcessingExecutor) { + this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, false); } + /** + * Create rpc implementation capable of handling RPC for monitoring and notifications even before the schemas of remote device are downloaded + */ + static NetconfDeviceRpc getRpcForInitialization(final NetconfDeviceCommunicator listener) { + return new NetconfDeviceRpc(INIT_SCHEMA_CTX, listener, new NetconfMessageTransformer(INIT_SCHEMA_CTX)); + } + + // FIXME reduce parameters public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ExecutorService globalProcessingExecutor, final MessageTransformer messageTransformer, final boolean reconnectOnSchemasChange) { + final ExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) { this.id = id; this.reconnectOnSchemasChange = reconnectOnSchemasChange; this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry(); - this.messageTransformer = messageTransformer; this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory(); this.salFacade = salFacade; this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver(); this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor); - this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id); + this.notificationHandler = new NotificationHandler(salFacade, id); } @Override @@ -111,24 +146,23 @@ public final class NetconfDevice implements RemoteDevice sourceResolverFuture = processingExecutor.submit(task); if(shouldListenOnSchemaChange(remoteSessionCapabilities)) { - registerToBaseNetconfStream(deviceRpc, listener); + registerToBaseNetconfStream(initRpc, listener); } final FutureCallback resolvedSourceCallback = new FutureCallback() { @Override public void onSuccess(final DeviceSources result) { - addProvidedSourcesToSchemaRegistry(deviceRpc, result); + addProvidedSourcesToSchemaRegistry(initRpc, result); setUpSchema(result); } private void setUpSchema(final DeviceSources result) { - processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, deviceRpc, listener)); + processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, listener)); } @Override @@ -139,33 +173,32 @@ public final class NetconfDevice implements RemoteDevice> rpcResultListenableFuture = - deviceRpc.invokeRpc(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME, NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT); + final CheckedFuture rpcResultListenableFuture = + deviceRpc.invokeRpc(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME), NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT); final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() { @Override - public Optional filterNotification(final CompositeNode notification) { + public Optional> filterNotification(final NormalizedNode notification) { if (isCapabilityChanged(notification)) { logger.info("{}: Schemas change detected, reconnecting", id); // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting listener.disconnect(); return Optional.absent(); } - return Optional.of(notification); + return Optional.>of(notification); } - private boolean isCapabilityChanged(final CompositeNode notification) { + private boolean isCapabilityChanged(final NormalizedNode notification) { return notification.getNodeType().equals(NetconfCapabilityChange.QNAME); } }; - Futures.addCallback(rpcResultListenableFuture, new FutureCallback>() { + Futures.addCallback(rpcResultListenableFuture, new FutureCallback() { @Override - public void onSuccess(final RpcResult result) { + public void onSuccess(final DOMRpcResult domRpcResult) { notificationHandler.addNotificationFilter(filter); } @@ -180,10 +213,13 @@ public final class NetconfDevice implements RemoteDevice transformer) { + messageTransformer = transformer; } private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) { @@ -217,10 +250,6 @@ public final class NetconfDevice implements RemoteDevice listener) { - return new NetconfDeviceRpc(listener, messageTransformer); - } - @Override public void onRemoteSessionDown() { notificationHandler.onRemoteSchemaDown(); @@ -233,7 +262,7 @@ public final class NetconfDevice implements RemoteDevice { + private final NetconfDeviceRpc deviceRpc; private final NetconfSessionPreferences remoteSessionCapabilities; private final RemoteDeviceId id; private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; - public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { + DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, + final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { this.deviceRpc = deviceRpc; this.remoteSessionCapabilities = remoteSessionCapabilities; this.id = id; this.stateSchemasResolver = stateSchemasResolver; } + public DeviceSourcesResolver(final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver, final NetconfDeviceRpc rpcForMonitoring) { + this(rpcForMonitoring, remoteSessionCapabilities, id, stateSchemasResolver); + } + @Override public DeviceSources call() throws Exception { @@ -307,10 +342,10 @@ public final class NetconfDevice implements RemoteDevice providedSourcesNotRequired = Sets.difference(providedSources, requiredSources); if (!providedSourcesNotRequired.isEmpty()) { logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}", @@ -326,7 +361,6 @@ public final class NetconfDevice implements RemoteDevice requiredSources; @@ -353,14 +387,12 @@ public final class NetconfDevice implements RemoteDevice listener; - private NetconfDeviceCapabilities capabilities; + private final NetconfDeviceCapabilities capabilities; - public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator listener) { + public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceCommunicator listener) { this.deviceSources = deviceSources; this.remoteSessionCapabilities = remoteSessionCapabilities; - this.deviceRpc = deviceRpc; this.listener = listener; this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities(); } @@ -390,10 +422,10 @@ public final class NetconfDevice implements RemoteDevice filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet()); + final Collection filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet()); capabilities.addCapabilities(filteredQNames); capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps()); - handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc); + handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result)); } @Override @@ -423,6 +455,10 @@ public final class NetconfDevice implements RemoteDevice stripMissingSource(final Collection requiredSources, final SourceIdentifier sIdToRemove) { final LinkedList sourceIdentifiers = Lists.newLinkedList(requiredSources); final boolean removed = sourceIdentifiers.remove(sIdToRemove); @@ -430,10 +466,10 @@ public final class NetconfDevice implements RemoteDevice getQNameFromSourceIdentifiers(Collection identifiers) { - Collection qNames = new HashSet<>(); - for (SourceIdentifier source : identifiers) { - Optional qname = getQNameFromSourceIdentifier(source); + private Collection getQNameFromSourceIdentifiers(final Collection identifiers) { + final Collection qNames = new HashSet<>(); + for (final SourceIdentifier source : identifiers) { + final Optional qname = getQNameFromSourceIdentifier(source); if (qname.isPresent()) { qNames.add(qname.get()); } @@ -444,8 +480,8 @@ public final class NetconfDevice implements RemoteDevice getQNameFromSourceIdentifier(SourceIdentifier identifier) { - for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) { + private Optional getQNameFromSourceIdentifier(final SourceIdentifier identifier) { + for (final QName qname : remoteSessionCapabilities.getModuleBasedCaps()) { if (qname.getLocalName().equals(identifier.getName()) && qname.getFormattedRevision().equals(identifier.getRevision())) { return Optional.of(qname);