X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.xtend;h=c9fb1fc0b895ffabcd609eed14424e7411856413;hp=49d9757f421058cb05ffed90f81922f6509ff688;hb=8ffb3e08bfa609405e883444480642b93c83242a;hpb=144c567aa78ca4f6ea6279163b3a8ba8d5de0dc2 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend index 49d9757f42..c9fb1fc0b8 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend @@ -1,29 +1,66 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.connect.netconf -import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier -import org.opendaylight.controller.md.sal.common.api.data.DataReader -import org.opendaylight.yangtools.yang.data.api.CompositeNode -import org.opendaylight.controller.netconf.client.NetconfClient -import org.opendaylight.controller.sal.core.api.RpcImplementation -import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.* +import com.google.common.base.Optional +import com.google.common.collect.FluentIterable +import io.netty.util.concurrent.EventExecutor +import java.io.InputStream import java.net.InetSocketAddress -import org.opendaylight.yangtools.yang.data.api.Node -import org.opendaylight.yangtools.yang.data.api.SimpleNode -import org.opendaylight.yangtools.yang.common.QName +import java.net.URI import java.util.Collections +import java.util.List +import java.util.Set +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler +import org.opendaylight.controller.md.sal.common.api.data.DataModification +import org.opendaylight.controller.md.sal.common.api.data.DataReader +import org.opendaylight.controller.netconf.api.NetconfMessage +import org.opendaylight.controller.netconf.client.NetconfClient import org.opendaylight.controller.netconf.client.NetconfClientDispatcher -import org.opendaylight.yangtools.concepts.Registration -import org.opendaylight.controller.sal.core.api.Provider +import org.opendaylight.controller.netconf.util.xml.XmlUtil import org.opendaylight.controller.sal.core.api.Broker.ProviderSession -import org.opendaylight.controller.sal.core.api.mount.MountProvisionService -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*; +import org.opendaylight.controller.sal.core.api.Provider +import org.opendaylight.controller.sal.core.api.RpcImplementation import org.opendaylight.controller.sal.core.api.data.DataBrokerService import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction -import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl +import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance +import org.opendaylight.controller.sal.core.api.mount.MountProvisionService +import org.opendaylight.protocol.framework.ReconnectStrategy +import org.opendaylight.yangtools.concepts.Registration +import org.opendaylight.yangtools.yang.common.QName +import org.opendaylight.yangtools.yang.data.api.CompositeNode +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier +import org.opendaylight.yangtools.yang.data.api.Node +import org.opendaylight.yangtools.yang.data.api.SimpleNode import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl +import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode +import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl +import org.opendaylight.yangtools.yang.model.api.SchemaContext +import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders +import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl +import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import static com.google.common.base.Preconditions.* +import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.* + +import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.* -class NetconfDevice implements Provider, DataReader, RpcImplementation, AutoCloseable { +class NetconfDevice implements Provider, // +DataReader, // +DataCommitHandler, // +RpcImplementation, // +AutoCloseable { var NetconfClient client; @@ -33,31 +70,139 @@ class NetconfDevice implements Provider, DataReader> operReaderReg + @Property + var ReconnectStrategy reconnectStrategy; - Registration> confReaderReg + @Property + var AbstractCachingSchemaSourceProvider schemaSourceProvider; - String name + @Property + private NetconfDeviceSchemaContextProvider deviceContextProvider + + protected val Logger logger + Registration> operReaderReg + Registration> confReaderReg + Registration> commitHandlerReg + + val String name MountProvisionService mountService + int messegeRetryCount = 5; + + int messageTimeoutCount = 5 * 1000; + + Set cachedCapabilities + + @Property + var NetconfClientDispatcher dispatcher + + static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance(); + + @Property + var SchemaSourceProvider remoteSourceProvider + + DataBrokerService dataBroker + public new(String name) { this.name = name; + this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name); this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE, Collections.singletonMap(INVENTORY_ID, name)).toInstance; } - def start(NetconfClientDispatcher dispatcher) { - client = new NetconfClient(name, socketAddress, dispatcher); - confReaderReg = mountInstance.registerConfigurationReader(path, this); - operReaderReg = mountInstance.registerOperationalReader(path, this); + def start() { + checkState(dispatcher != null, "Dispatcher must be set."); + checkState(schemaSourceProvider != null, "Schema Source Provider must be set.") + checkState(eventExecutor != null, "Event executor must be set."); + + val listener = new NetconfDeviceListener(this); + val task = startClientTask(dispatcher, listener) + if (mountInstance != null) { + commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this) + } + return processingExecutor.submit(task) as Future; + + //commitHandlerReg = mountInstance.registerCommitHandler(path,this); + } + + def Optional getSchemaContext() { + if (deviceContextProvider == null) { + return Optional.absent(); + } + return deviceContextProvider.currentContext; + } + + private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) { + return [ | + try { + logger.info("Starting Netconf Client on: {}", socketAddress); + client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener); + logger.debug("Initial capabilities {}", initialCapabilities); + var SchemaSourceProvider delegate; + if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) { + delegate = new NetconfRemoteSchemaSourceProvider(this); + } else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) { + delegate = new NetconfRemoteSchemaSourceProvider(this); + } else { + logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress); + delegate = SchemaSourceProviders.noopProvider(); + } + remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); + deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider); + deviceContextProvider.createContextFromCapabilities(initialCapabilities); + if (mountInstance != null && schemaContext.isPresent) { + mountInstance.schemaContext = schemaContext.get(); + } + updateDeviceState() + if (mountInstance != null && confReaderReg == null && operReaderReg == null) { + confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); + operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); + } + } catch (Exception e) { + logger.error("Netconf client NOT started. ", e) + } + ] + } + + private def updateDeviceState() { + val transaction = dataBroker.beginTransaction + + val it = ImmutableCompositeNode.builder + setQName(INVENTORY_NODE) + addLeaf(INVENTORY_ID, name) + addLeaf(INVENTORY_CONNECTED, client.clientSession.up) + + logger.debug("Client capabilities {}", client.capabilities) + for (capability : client.capabilities) { + addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability) + } + + logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.") + transaction.putOperationalData(path, it.toInstance) + logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.") + val transactionStatus = transaction.commit.get; + + if (transactionStatus.successful) { + logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.") + } else { + logger.debug("Update device state transaction " + transaction.identifier + " FAILED!") + logger.debug("Update device state transaction status " + transaction.status) + } } override readConfigurationData(InstanceIdentifier path) { - val result = invokeRpc(NETCONF_GET_CONFIG_QNAME, wrap(NETCONF_GET_CONFIG_QNAME, path.toFilterStructure())); + val result = invokeRpc(NETCONF_GET_CONFIG_QNAME, + wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())); val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME); return data?.findNode(path) as CompositeNode; } @@ -72,10 +217,30 @@ class NetconfDevice implements Provider, DataReader findNode(CompositeNode node, InstanceIdentifier identifier) { + static def Node findNode(CompositeNode node, InstanceIdentifier identifier) { var Node current = node; for (arg : identifier.path) { @@ -120,12 +283,17 @@ class NetconfDevice implements Provider, DataReader modification) { + val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification); + twoPhaseCommit.prepare() + return twoPhaseCommit; + } + + def getInitialCapabilities() { + val capabilities = client?.capabilities; + if (capabilities == null) { + return null; + } + if (cachedCapabilities == null) { + cachedCapabilities = FluentIterable.from(capabilities).filter[ + contains("?") && contains("module=") && contains("revision=")].transform [ + val parts = split("\\?"); + val namespace = parts.get(0); + val queryParams = FluentIterable.from(parts.get(1).split("&")); + var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", ""); + val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", ""); + if (revision === null) { + logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision="); + revision = queryParams.findFirst[startsWith("&revision=")]?.replaceAll("revision=", ""); + if (revision != null) { + logger.warn("Netconf device returned revision incorectly escaped for {}", it) + } + } + if (revision == null) { + return QName.create(URI.create(namespace), null, moduleName); + } + return QName.create(namespace, revision, moduleName); + ].toSet(); + } + return cachedCapabilities; + } + override close() { confReaderReg?.close() operReaderReg?.close() @@ -140,3 +343,53 @@ class NetconfDevice implements Provider, DataReader sourceProvider; + + @Property + var Optional currentContext; + + new(NetconfDevice device, SchemaSourceProvider sourceProvider) { + _device = device + _sourceProvider = sourceProvider + _currentContext = Optional.absent(); + } + + def createContextFromCapabilities(Iterable capabilities) { + val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider) + if (!sourceContext.missingSources.empty) { + device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources); + } + device.logger.debug("Trying to create schema context from {}", sourceContext.validSources) + val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext); + if (!sourceContext.validSources.empty) { + val schemaContext = tryToCreateContext(modelsToParse); + currentContext = Optional.fromNullable(schemaContext); + } else { + currentContext = Optional.absent(); + } + if (currentContext.present) { + device.logger.debug("Schema context successfully created."); + } + + } + + def SchemaContext tryToCreateContext(List modelsToParse) { + val parser = new YangParserImpl(); + try { + + val models = parser.parseYangModelsFromStreams(modelsToParse); + val result = parser.resolveSchemaContext(models); + return result; + } catch (Exception e) { + device.logger.debug("Error occured during parsing YANG schemas", e); + return null; + } + } +}