X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.java;h=9a5b239024c5bb0cbca3798de58ccc102a994224;hp=94beaed0dfc61aeb0c5f260c24a1a689ec9fd7a1;hb=83dfe301bf2a2b1eff6883a2af3282c95d5a752e;hpb=478ce1fa1dc30974b7cf23fd5258f1af5366d547 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index 94beaed0df..9a5b239024 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -7,541 +7,448 @@ */ package org.opendaylight.controller.sal.connect.netconf; -import static com.google.common.base.Preconditions.checkState; -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED; -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID; -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE; -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH; -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage; -import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap; - +import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.util.Collection; -import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; - -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; -import org.opendaylight.controller.md.sal.common.api.data.DataModification; -import org.opendaylight.controller.md.sal.common.api.data.DataReader; -import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; -import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; -import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration; -import org.opendaylight.controller.sal.binding.api.data.DataProviderService; -import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; -import org.opendaylight.controller.sal.core.api.Provider; -import org.opendaylight.controller.sal.core.api.RpcImplementation; -import org.opendaylight.controller.sal.core.api.data.DataBrokerService; -import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction; -import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance; -import org.opendaylight.controller.sal.core.api.mount.MountProvisionService; -import org.opendaylight.protocol.framework.ReconnectStrategy; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; -import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.sal.connect.api.MessageTransformer; +import org.opendaylight.controller.sal.connect.api.RemoteDevice; +import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; +import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; +import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; +import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider; +import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; +import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.Node; -import org.opendaylight.yangtools.yang.data.api.SimpleNode; -import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl; -import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; -import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl; -import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder; -import org.opendaylight.yangtools.yang.model.api.Module; -import org.opendaylight.yangtools.yang.model.api.RpcDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider; -import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider; -import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; -import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext; +import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation; +import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; +import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ListenableFuture; -import io.netty.util.concurrent.EventExecutor; - -public class NetconfDevice implements Provider, // - DataReader, // - DataCommitHandler, // - RpcImplementation, // - AutoCloseable { - - InetSocketAddress socketAddress; - - MountProvisionInstance mountInstance; - - EventExecutor eventExecutor; - - ExecutorService processingExecutor; - - InstanceIdentifier path; - - ReconnectStrategy reconnectStrategy; - - AbstractCachingSchemaSourceProvider schemaSourceProvider; - - private NetconfDeviceSchemaContextProvider deviceContextProvider; - - protected Logger logger; - - Registration> operReaderReg; - Registration> confReaderReg; - Registration> commitHandlerReg; - List rpcReg; - - String name; - - MountProvisionService mountService; - - NetconfClientDispatcher dispatcher; - - static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance(); - - SchemaSourceProvider remoteSourceProvider; - - private volatile DataBrokerService dataBroker; - - NetconfDeviceListener listener; - - private boolean rollbackSupported; +/** + * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade + */ +public final class NetconfDevice implements RemoteDevice { - private NetconfReconnectingClientConfiguration clientConfig; - private volatile DataProviderService dataProviderService; + private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class); - public NetconfDevice(String name) { - this.name = name; - this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name); - this.path = InstanceIdentifier.builder(INVENTORY_PATH) - .nodeWithKey(INVENTORY_NODE, Collections.singletonMap(INVENTORY_ID, name)).toInstance(); + public static final Function QNAME_TO_SOURCE_ID_FUNCTION = new Function() { + @Override + public SourceIdentifier apply(final QName input) { + return new SourceIdentifier(input.getLocalName(), Optional.fromNullable(input.getFormattedRevision())); + } + }; + + private final RemoteDeviceId id; + private final boolean reconnectOnSchemasChange; + + private final SchemaContextFactory schemaContextFactory; + private final RemoteDeviceHandler salFacade; + private final ListeningExecutorService processingExecutor; + private final SchemaSourceRegistry schemaRegistry; + private final MessageTransformer messageTransformer; + private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; + private final NotificationHandler notificationHandler; + private final List> sourceRegistrations = Lists.newArrayList(); + + public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ExecutorService globalProcessingExecutor, final MessageTransformer messageTransformer) { + this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, messageTransformer, false); + } + + // FIXME reduce parameters + public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + final ExecutorService globalProcessingExecutor, final MessageTransformer messageTransformer, final boolean reconnectOnSchemasChange) { + this.id = id; + this.reconnectOnSchemasChange = reconnectOnSchemasChange; + this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry(); + this.messageTransformer = messageTransformer; + this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory(); + this.salFacade = salFacade; + this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver(); + this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor); + this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id); } - public void start() { - checkState(dispatcher != null, "Dispatcher must be set."); - checkState(schemaSourceProvider != null, "Schema Source Provider must be set."); - checkState(eventExecutor != null, "Event executor must be set."); + @Override + public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, + final NetconfDeviceCommunicator listener) { + // SchemaContext setup has to be performed in a dedicated thread since + // we are in a netty thread in this method + // Yang models are being downloaded in this method and it would cause a + // deadlock if we used the netty thread + // http://netty.io/wiki/thread-model.html + logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities); + + final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(listener); + + final DeviceSourcesResolver task = new DeviceSourcesResolver(deviceRpc, remoteSessionCapabilities, id, stateSchemasResolver); + final ListenableFuture sourceResolverFuture = processingExecutor.submit(task); + + if(shouldListenOnSchemaChange(remoteSessionCapabilities)) { + registerToBaseNetconfStream(deviceRpc, listener); + } - Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener); - listener = (NetconfDeviceListener) clientConfig.getSessionListener(); + final FutureCallback resolvedSourceCallback = new FutureCallback() { + @Override + public void onSuccess(final DeviceSources result) { + addProvidedSourcesToSchemaRegistry(deviceRpc, result); + setUpSchema(result); + } - logger.info("Starting NETCONF Client {} for address {}", name, socketAddress); + private void setUpSchema(final DeviceSources result) { + processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, deviceRpc, listener)); + } - dispatcher.createReconnectingClient(clientConfig); - } + @Override + public void onFailure(final Throwable t) { + logger.warn("{}: Unexpected error resolving device sources: {}", id, t); + handleSalInitializationFailure(t, listener); + } + }; - Optional getSchemaContext() { - if (deviceContextProvider == null) { - return Optional.absent(); - } - return deviceContextProvider.currentContext; - } + Futures.addCallback(sourceResolverFuture, resolvedSourceCallback); - void bringDown() { - if (rpcReg != null) { - for (RpcRegistration reg : rpcReg) { - reg.close(); - } - rpcReg = null; - } - closeGracefully(confReaderReg); - confReaderReg = null; - closeGracefully(operReaderReg); - operReaderReg = null; - closeGracefully(commitHandlerReg); - commitHandlerReg = null; - - updateDeviceState(false, Collections. emptySet()); } - private void closeGracefully(final AutoCloseable resource) { - if (resource != null) { - try { - resource.close(); - } catch (Exception e) { - logger.warn("Ignoring exception while closing {}", resource, e); - } - } - } + private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) { + final ListenableFuture> rpcResultListenableFuture = + deviceRpc.invokeRpc(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME, NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT); - void bringUp(final SchemaSourceProvider delegate, final Set capabilities, final boolean rollbackSupported) { - // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener. - // Reason: delegate.getSchema blocks thread when waiting for response - // however, if the netty thread is blocked, no incoming message can be processed - // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html - // TODO redesign +refactor - processingExecutor.submit(new Runnable() { + final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() { @Override - public void run() { - NetconfDevice.this.rollbackSupported = rollbackSupported; - remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); - deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider); - deviceContextProvider.createContextFromCapabilities(capabilities); - if (mountInstance != null && getSchemaContext().isPresent()) { - mountInstance.setSchemaContext(getSchemaContext().get()); + public Optional filterNotification(final CompositeNode notification) { + if (isCapabilityChanged(notification)) { + logger.info("{}: Schemas change detected, reconnecting", id); + // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting + listener.disconnect(); + return Optional.absent(); } + return Optional.of(notification); + } - updateDeviceState(true, capabilities); + private boolean isCapabilityChanged(final CompositeNode notification) { + return notification.getNodeType().equals(NetconfCapabilityChange.QNAME); + } + }; - if (mountInstance != null) { - confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this); - operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this); - commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this); + Futures.addCallback(rpcResultListenableFuture, new FutureCallback>() { + @Override + public void onSuccess(final RpcResult result) { + notificationHandler.addNotificationFilter(filter); + } - List rpcs = new ArrayList<>(); - // TODO same condition twice - if (mountInstance != null && getSchemaContext().isPresent()) { - for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) { - rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this)); - } - } - rpcReg = rpcs; - } + @Override + public void onFailure(final Throwable t) { + logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t); } }); } - private void updateDeviceState(boolean up, Set capabilities) { - checkDataStoreState(); - - DataModificationTransaction transaction = dataBroker.beginTransaction(); + private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) { + return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange; + } - CompositeNodeBuilder it = ImmutableCompositeNode.builder(); - it.setQName(INVENTORY_NODE); - it.addLeaf(INVENTORY_ID, name); - it.addLeaf(INVENTORY_CONNECTED, up); + private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) { + updateMessageTransformer(result); + salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc); + notificationHandler.onRemoteSchemaUp(); - logger.debug("Client capabilities {}", capabilities); - for (QName capability : capabilities) { - it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString()); - } - - logger.debug("Update device state transaction " + transaction.getIdentifier() - + " putting operational data started."); - transaction.removeOperationalData(path); - transaction.putOperationalData(path, it.toInstance()); - logger.debug("Update device state transaction " + transaction.getIdentifier() - + " putting operational data ended."); - - // FIXME: this has to be asynchronous - RpcResult transactionStatus = null; - try { - transactionStatus = transaction.commit().get(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for response", e); - } catch (ExecutionException e) { - throw new RuntimeException("Read configuration data " + path + " failed", e); - } - // TODO better ex handling + logger.info("{}: Netconf connector initialized successfully", id); + } - if (transactionStatus.isSuccessful()) { - logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL."); - } else { - logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!"); - logger.debug("Update device state transaction status " + transaction.getStatus()); - } + private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator listener) { + logger.error("{}: Initialization in sal failed, disconnecting from device", id, t); + listener.close(); + onRemoteSessionDown(); + resetMessageTransformer(); } - @Override - public CompositeNode readConfigurationData(InstanceIdentifier path) { - RpcResult result = null; - try { - result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME, - wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for response", e); - } catch (ExecutionException e) { - throw new RuntimeException("Read configuration data " + path + " failed", e); - } + /** + * Set the schema context inside transformer to null as is in initial state + */ + private void resetMessageTransformer() { + updateMessageTransformer(null); + } - CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME); - return data == null ? null : (CompositeNode) findNode(data, path); + /** + * Update initial message transformer to use retrieved schema + */ + private void updateMessageTransformer(final SchemaContext currentSchemaContext) { + messageTransformer.onGlobalContextUpdated(currentSchemaContext); } - @Override - public CompositeNode readOperationalData(InstanceIdentifier path) { - RpcResult result = null; - try { - result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for response", e); - } catch (ExecutionException e) { - throw new RuntimeException("Read configuration data " + path + " failed", e); + private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) { + final NetconfRemoteSchemaYangSourceProvider yangProvider = new NetconfRemoteSchemaYangSourceProvider(id, deviceRpc); + for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) { + sourceRegistrations.add(schemaRegistry.registerSchemaSource(yangProvider, + PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))); } - - CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME); - return (CompositeNode) findNode(data, path); } - @Override - public Set getSupportedRpcs() { - return Collections.emptySet(); + private NetconfDeviceRpc setUpDeviceRpc(final RemoteDeviceCommunicator listener) { + return new NetconfDeviceRpc(listener, messageTransformer); } @Override - public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { - return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc); + public void onRemoteSessionDown() { + salFacade.onDeviceDisconnected(); + for (final SchemaSourceRegistration sourceRegistration : sourceRegistrations) { + sourceRegistration.close(); + } + resetMessageTransformer(); } @Override - public Collection getProviderFunctionality() { - return Collections.emptySet(); + public void onRemoteSessionFailed(Throwable throwable) { + salFacade.onDeviceFailed(throwable); } @Override - public void onSessionInitiated(ProviderSession session) { - dataBroker = session.getService(DataBrokerService.class); - - processingExecutor.submit(new Runnable() { - @Override - public void run() { - updateInitialState(); - } - }); - - mountService = session.getService(MountProvisionService.class); - if (mountService != null) { - mountInstance = mountService.createOrGetMountPoint(path); + public void onNotification(final NetconfMessage notification) { + notificationHandler.handleNotification(notification); + } + + /** + * Just a transfer object containing schema related dependencies. Injected in constructor. + */ + public static class SchemaResourcesDTO { + private final SchemaSourceRegistry schemaRegistry; + private final SchemaContextFactory schemaContextFactory; + private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; + + public SchemaResourcesDTO(final SchemaSourceRegistry schemaRegistry, final SchemaContextFactory schemaContextFactory, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { + this.schemaRegistry = Preconditions.checkNotNull(schemaRegistry); + this.schemaContextFactory = Preconditions.checkNotNull(schemaContextFactory); + this.stateSchemasResolver = Preconditions.checkNotNull(stateSchemasResolver); } - } - private void updateInitialState() { - checkDataStoreState(); - - DataModificationTransaction transaction = dataBroker.beginTransaction(); - if (operationalNodeNotExisting(transaction)) { - transaction.putOperationalData(path, getNodeWithId()); - } - if (configurationNodeNotExisting(transaction)) { - transaction.putConfigurationData(path, getNodeWithId()); + public SchemaSourceRegistry getSchemaRegistry() { + return schemaRegistry; } - try { - transaction.commit().get(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for response", e); - } catch (ExecutionException e) { - throw new RuntimeException("Read configuration data " + path + " failed", e); + public SchemaContextFactory getSchemaContextFactory() { + return schemaContextFactory; } - } - - private void checkDataStoreState() { - // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore - dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder( - Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build()); } - - CompositeNode getNodeWithId() { - SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name); - return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.> singletonList(id)); - } - boolean configurationNodeNotExisting(DataModificationTransaction transaction) { - return null == transaction.readConfigurationData(path); - } - - boolean operationalNodeNotExisting(DataModificationTransaction transaction) { - return null == transaction.readOperationalData(path); - } - - static Node findNode(CompositeNode node, InstanceIdentifier identifier) { - - Node current = node; - for (InstanceIdentifier.PathArgument arg : identifier.getPath()) { - if (current instanceof SimpleNode) { - return null; - } else if (current instanceof CompositeNode) { - CompositeNode currentComposite = (CompositeNode) current; - - current = currentComposite.getFirstCompositeByName(arg.getNodeType()); - if (current == null) { - current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision()); - } - if (current == null) { - current = currentComposite.getFirstSimpleByName(arg.getNodeType()); - } - if (current == null) { - current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision()); - } - if (current == null) { - return null; - } - } + public NetconfStateSchemas.NetconfStateSchemasResolver getStateSchemasResolver() { + return stateSchemasResolver; } - return current; } - @Override - public DataCommitTransaction requestCommit( - DataModification modification) { - NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, - modification, true, rollbackSupported); - try { - twoPhaseCommit.prepare(); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for response", e); - } catch (ExecutionException e) { - throw new RuntimeException("Read configuration data " + path + " failed", e); + /** + * Schema building callable. + */ + private static class DeviceSourcesResolver implements Callable { + private final NetconfDeviceRpc deviceRpc; + private final NetconfSessionPreferences remoteSessionCapabilities; + private final RemoteDeviceId id; + private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; + + public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { + this.deviceRpc = deviceRpc; + this.remoteSessionCapabilities = remoteSessionCapabilities; + this.id = id; + this.stateSchemasResolver = stateSchemasResolver; } - return twoPhaseCommit; - } - Set getCapabilities(Collection capabilities) { - return FluentIterable.from(capabilities).filter(new Predicate() { - @Override - public boolean apply(final String capability) { - return capability.contains("?") && capability.contains("module=") && capability.contains("revision="); - } - }).transform(new Function() { - @Override - public QName apply(final String capability) { - String[] parts = capability.split("\\?"); - String namespace = parts[0]; - FluentIterable queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&"))); + @Override + public DeviceSources call() throws Exception { - String revision = getStringAndTransform(queryParams, "revision=", "revision="); + final Set requiredSources = Sets.newHashSet(Collections2.transform( + remoteSessionCapabilities.getModuleBasedCaps(), QNAME_TO_SOURCE_ID_FUNCTION)); - String moduleName = getStringAndTransform(queryParams, "module=", "module="); + // If monitoring is not supported, we will still attempt to create schema, sources might be already provided + final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id); + logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames()); - if (revision == null) { - logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision="); - revision = getStringAndTransform(queryParams, "amp;revision==", "revision="); + final Set providedSources = Sets.newHashSet(Collections2.transform( + availableSchemas.getAvailableYangSchemasQNames(), QNAME_TO_SOURCE_ID_FUNCTION)); - if (revision != null) { - logger.warn("Netconf device returned revision incorectly escaped for {}", capability); - } - } - if (revision == null) { - return QName.create(URI.create(namespace), null, moduleName); - } - return QName.create(namespace, revision, moduleName); - } - - private String getStringAndTransform(final Iterable queryParams, final String match, - final String substringToRemove) { - Optional found = Iterables.tryFind(queryParams, new Predicate() { - @Override - public boolean apply(final String input) { - return input.startsWith(match); - } - }); + final Set requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources); - return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null; + if (!requiredSourcesNotProvided.isEmpty()) { + logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}", + id, requiredSourcesNotProvided); + logger.warn("{}: Attempting to build schema context from required sources", id); } - }).toSet(); - } - @Override - public void close() { - bringDown(); - } - - public String getName() { - return name; - } - - public InetSocketAddress getSocketAddress() { - return socketAddress; - } + // TODO should we perform this ? We have a mechanism to fix initialization of devices not reporting or required modules in hello + // That is overriding capabilities in configuration using attribute yang-module-capabilities + // This is more user friendly even though it clashes with attribute yang-module-capabilities + // Some devices do not report all required models in hello message, but provide them + final Set providedSourcesNotRequired = Sets.difference(providedSources, requiredSources); + if (!providedSourcesNotRequired.isEmpty()) { + logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}", + id, providedSourcesNotRequired); + logger.warn("{}: Adding provided but not required sources as required to prevent failures", id); + requiredSources.addAll(providedSourcesNotRequired); + } - public MountProvisionInstance getMountInstance() { - return mountInstance; + return new DeviceSources(requiredSources, providedSources); + } } - public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) { - this.reconnectStrategy = reconnectStrategy; - } + /** + * Contains RequiredSources - sources from capabilities. + * + */ + private static final class DeviceSources { + private final Collection requiredSources; + private final Collection providedSources; - public void setProcessingExecutor(final ExecutorService processingExecutor) { - this.processingExecutor = processingExecutor; - } + public DeviceSources(final Collection requiredSources, final Collection providedSources) { + this.requiredSources = requiredSources; + this.providedSources = providedSources; + } - public void setSocketAddress(final InetSocketAddress socketAddress) { - this.socketAddress = socketAddress; - } + public Collection getRequiredSources() { + return requiredSources; + } - public void setEventExecutor(final EventExecutor eventExecutor) { - this.eventExecutor = eventExecutor; - } + public Collection getProvidedSources() { + return providedSources; + } - public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider schemaSourceProvider) { - this.schemaSourceProvider = schemaSourceProvider; } - public void setDispatcher(final NetconfClientDispatcher dispatcher) { - this.dispatcher = dispatcher; - } + /** + * Schema builder that tries to build schema context from provided sources or biggest subset of it. + */ + private final class RecursiveSchemaSetup implements Runnable { + private final DeviceSources deviceSources; + private final NetconfSessionPreferences remoteSessionCapabilities; + private final NetconfDeviceRpc deviceRpc; + private final RemoteDeviceCommunicator listener; + private NetconfDeviceCapabilities capabilities; - public void setClientConfig(final NetconfReconnectingClientConfiguration clientConfig) { - this.clientConfig = clientConfig; - } + public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator listener) { + this.deviceSources = deviceSources; + this.remoteSessionCapabilities = remoteSessionCapabilities; + this.deviceRpc = deviceRpc; + this.listener = listener; + this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities(); + } - public void setDataProviderService(final DataProviderService dataProviderService) { - this.dataProviderService = dataProviderService; - } -} + @Override + public void run() { + setUpSchema(deviceSources.getRequiredSources()); + } -class NetconfDeviceSchemaContextProvider { + /** + * Recursively build schema context, in case of success or final failure notify device + */ + // FIXME reimplement without recursion + private void setUpSchema(final Collection requiredSources) { + logger.trace("{}: Trying to build schema context from {}", id, requiredSources); + + // If no more sources, fail + if(requiredSources.isEmpty()) { + handleSalInitializationFailure(new IllegalStateException(id + ": No more sources for schema context"), listener); + return; + } - NetconfDevice device; + final CheckedFuture schemaBuilderFuture = schemaContextFactory.createSchemaContext(requiredSources); - SchemaSourceProvider sourceProvider; + final FutureCallback RecursiveSchemaBuilderCallback = new FutureCallback() { - Optional currentContext; + @Override + public void onSuccess(final SchemaContext result) { + logger.debug("{}: Schema context built successfully from {}", id, requiredSources); + Collection filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet()); + capabilities.addCapabilities(filteredQNames); + capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps()); + handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc); + } - NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider sourceProvider) { - this.device = device; - this.sourceProvider = sourceProvider; - this.currentContext = Optional.absent(); - } + @Override + public void onFailure(final Throwable t) { + // In case source missing, try without it + if (t instanceof MissingSchemaSourceException) { + final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId(); + logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource); + capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), UnavailableCapability.FailureReason.MissingSource); + setUpSchema(stripMissingSource(requiredSources, missingSource)); + + // In case resolution error, try only with resolved sources + } else if (t instanceof SchemaResolutionException) { + // TODO check for infinite loop + final SchemaResolutionException resolutionException = (SchemaResolutionException) t; + final Set unresolvedSources = resolutionException.getUnsatisfiedImports().keySet(); + capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve); + logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports()); + setUpSchema(resolutionException.getResolvedSources()); + // unknown error, fail + } else { + handleSalInitializationFailure(t, listener); + } + } + }; - void createContextFromCapabilities(Iterable capabilities) { - YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider); - if (!sourceContext.getMissingSources().isEmpty()) { - device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources()); - } - device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources()); - List modelsToParse = YangSourceContext.getValidInputStreams(sourceContext); - if (!sourceContext.getValidSources().isEmpty()) { - SchemaContext schemaContext = tryToCreateContext(modelsToParse); - currentContext = Optional.fromNullable(schemaContext); - } else { - currentContext = Optional.absent(); + Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback); } - if (currentContext.isPresent()) { - device.logger.debug("Schema context successfully created."); + + private Collection stripMissingSource(final Collection requiredSources, final SourceIdentifier sIdToRemove) { + final LinkedList sourceIdentifiers = Lists.newLinkedList(requiredSources); + final boolean removed = sourceIdentifiers.remove(sIdToRemove); + Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources); + return sourceIdentifiers; } - } - SchemaContext tryToCreateContext(List modelsToParse) { - YangParserImpl parser = new YangParserImpl(); - try { + private Collection getQNameFromSourceIdentifiers(Collection identifiers) { + Collection qNames = new HashSet<>(); + for (SourceIdentifier source : identifiers) { + Optional qname = getQNameFromSourceIdentifier(source); + if (qname.isPresent()) { + qNames.add(qname.get()); + } + } + if (qNames.isEmpty()) { + logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers); + } + return qNames; + } - Set models = parser.parseYangModelsFromStreams(modelsToParse); - return parser.resolveSchemaContext(models); - } catch (Exception e) { - device.logger.debug("Error occured during parsing YANG schemas", e); - return null; + private Optional getQNameFromSourceIdentifier(SourceIdentifier identifier) { + for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) { + if (qname.getLocalName().equals(identifier.getName()) + && qname.getFormattedRevision().equals(identifier.getRevision())) { + return Optional.of(qname); + } + } + throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier); } } }