X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.xtend;h=12e8b5587caa230cb693610d21f8135f61e9e2e9;hp=bfe352ad41322cf78404426a5582afc22a4e074d;hb=9c9d6e69da3aff2d0576d8c15ea0fa0692595b6d;hpb=85271ecd554bd278759c2b0d03f0e1d4d7b3ec78 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend index bfe352ad41..12e8b5587c 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend @@ -1,70 +1,65 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.connect.netconf -import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier -import org.opendaylight.controller.md.sal.common.api.data.DataReader -import org.opendaylight.yangtools.yang.data.api.CompositeNode -import org.opendaylight.controller.netconf.client.NetconfClient -import org.opendaylight.controller.sal.core.api.RpcImplementation -import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.* +import com.google.common.base.Optional +import com.google.common.collect.FluentIterable +import io.netty.util.concurrent.EventExecutor +import java.io.InputStream import java.net.InetSocketAddress -import org.opendaylight.yangtools.yang.data.api.Node -import org.opendaylight.yangtools.yang.data.api.SimpleNode -import org.opendaylight.yangtools.yang.common.QName +import java.net.URI +import java.util.ArrayList +import java.util.Collection import java.util.Collections +import java.util.List +import java.util.Set +import java.util.concurrent.ExecutorService +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler +import org.opendaylight.controller.md.sal.common.api.data.DataModification +import org.opendaylight.controller.md.sal.common.api.data.DataReader import org.opendaylight.controller.netconf.client.NetconfClientDispatcher -import org.opendaylight.yangtools.concepts.Registration -import org.opendaylight.controller.sal.core.api.Provider import org.opendaylight.controller.sal.core.api.Broker.ProviderSession -import org.opendaylight.controller.sal.core.api.mount.MountProvisionService -import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*; +import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration +import org.opendaylight.controller.sal.core.api.Provider +import org.opendaylight.controller.sal.core.api.RpcImplementation import org.opendaylight.controller.sal.core.api.data.DataBrokerService import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction -import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl -import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl +import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance +import org.opendaylight.controller.sal.core.api.mount.MountProvisionService import org.opendaylight.protocol.framework.ReconnectStrategy -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler -import org.opendaylight.controller.md.sal.common.api.data.DataModification -import com.google.common.collect.FluentIterable -import org.opendaylight.yangtools.yang.model.api.SchemaContext +import org.opendaylight.yangtools.concepts.Registration +import org.opendaylight.yangtools.yang.common.QName +import org.opendaylight.yangtools.yang.data.api.CompositeNode +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier +import org.opendaylight.yangtools.yang.data.api.Node +import org.opendaylight.yangtools.yang.data.api.SimpleNode +import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState +import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl +import org.opendaylight.yangtools.yang.model.api.SchemaContext +import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl -import java.io.InputStream -import org.slf4j.LoggerFactory +import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext import org.slf4j.Logger -import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener -import org.opendaylight.controller.netconf.client.NetconfClientSession -import org.opendaylight.controller.netconf.api.NetconfMessage -import io.netty.util.concurrent.EventExecutor +import org.slf4j.LoggerFactory -import java.util.Map -import java.util.Set -import com.google.common.collect.ImmutableMap +import static com.google.common.base.Preconditions.* +import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.* -import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider -import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider -import com.google.common.base.Optional -import com.google.common.collect.ImmutableList -import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders -import static com.google.common.base.Preconditions.*; -import java.util.concurrent.ExecutorService -import java.util.concurrent.Future -import org.opendaylight.controller.netconf.client.NetconfClientSessionListener -import io.netty.util.concurrent.Promise -import org.opendaylight.controller.netconf.util.xml.XmlElement -import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants -import java.util.concurrent.ExecutionException -import java.util.concurrent.locks.ReentrantLock - -class NetconfDevice implements Provider, // +import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.* + +class NetconfDevice implements Provider, // DataReader, // DataCommitHandler, // RpcImplementation, // AutoCloseable { - var NetconfClient client; - @Property var InetSocketAddress socketAddress; @@ -86,30 +81,35 @@ AutoCloseable { @Property var AbstractCachingSchemaSourceProvider schemaSourceProvider; - private NetconfDeviceSchemaContextProvider schemaContextProvider + @Property + private NetconfDeviceSchemaContextProvider deviceContextProvider protected val Logger logger Registration> operReaderReg Registration> confReaderReg Registration> commitHandlerReg + List rpcReg + @Property val String name - MountProvisionService mountService - int messegeRetryCount = 5; - - int messageTimeoutCount = 5 * 1000; - - Set cachedCapabilities + MountProvisionService mountService @Property var NetconfClientDispatcher dispatcher - + static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance(); + @Property + var SchemaSourceProvider remoteSourceProvider + + DataBrokerService dataBroker + + var NetconfDeviceListener listener; + public new(String name) { - this.name = name; + this._name = name; this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name); this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE, Collections.singletonMap(INVENTORY_ID, name)).toInstance; @@ -120,54 +120,98 @@ AutoCloseable { checkState(schemaSourceProvider != null, "Schema Source Provider must be set.") checkState(eventExecutor != null, "Event executor must be set."); - val listener = new NetconfDeviceListener(this,eventExecutor); - val task = startClientTask(dispatcher, listener) - if(mountInstance != null) { - confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); - operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); - } - return processingExecutor.submit(task) as Future; + listener = new NetconfDeviceListener(this); + + logger.info("Starting NETCONF Client {} for address {}", name, socketAddress); - //commitHandlerReg = mountInstance.registerCommitHandler(path,this); + dispatcher.createClient(socketAddress, listener, reconnectStrategy); } def Optional getSchemaContext() { - if (schemaContextProvider == null) { + if (deviceContextProvider == null) { return Optional.absent(); } - return schemaContextProvider.currentContext; + return deviceContextProvider.currentContext; } - private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) { - return [ | - logger.info("Starting Netconf Client on: {}", socketAddress); - client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener); - logger.debug("Initial capabilities {}", initialCapabilities); - var SchemaSourceProvider delegate; - if (initialCapabilities.contains(NetconfMapping.IETF_NETCONF_MONITORING_MODULE)) { - delegate = new NetconfDeviceSchemaSourceProvider(this); - } else { - logger.info("Device does not support IETF Netconf Monitoring.", socketAddress); - delegate = SchemaSourceProviders.noopProvider(); + def bringDown() { + if (rpcReg != null) { + for (reg : rpcReg) { + reg.close() } - val sourceProvider = schemaSourceProvider.createInstanceFor(delegate); - schemaContextProvider = new NetconfDeviceSchemaContextProvider(this, sourceProvider); - schemaContextProvider.createContextFromCapabilities(initialCapabilities); - if (mountInstance != null && schemaContext.isPresent) { - mountInstance.schemaContext = schemaContext.get(); + rpcReg = null + } + confReaderReg?.close() + confReaderReg = null + operReaderReg?.close() + operReaderReg = null + commitHandlerReg?.close() + commitHandlerReg = null + + updateDeviceState(false, Collections.emptySet()) + } + + def bringUp(SchemaSourceProvider delegate, Set capabilities) { + remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); + deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider); + deviceContextProvider.createContextFromCapabilities(capabilities); + if (mountInstance != null && schemaContext.isPresent) { + mountInstance.schemaContext = schemaContext.get(); + } + + updateDeviceState(true, capabilities) + + if (mountInstance != null) { + confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); + operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); + commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this); + + val rpcs = new ArrayList(); + for (rpc : mountInstance.schemaContext.operations) { + rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this)); } - ] + rpcReg = rpcs + } + } + + private def updateDeviceState(boolean up, Set capabilities) { + val transaction = dataBroker.beginTransaction + + val it = ImmutableCompositeNode.builder + setQName(INVENTORY_NODE) + addLeaf(INVENTORY_ID, name) + addLeaf(INVENTORY_CONNECTED, up) + + logger.debug("Client capabilities {}", capabilities) + for (capability : capabilities) { + addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability) + } + + logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.") + transaction.removeOperationalData(path) + transaction.putOperationalData(path, it.toInstance) + logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.") + + // FIXME: this has to be asynchronous + val transactionStatus = transaction.commit.get; + + if (transactionStatus.successful) { + logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.") + } else { + logger.debug("Update device state transaction " + transaction.identifier + " FAILED!") + logger.debug("Update device state transaction status " + transaction.status) + } } override readConfigurationData(InstanceIdentifier path) { val result = invokeRpc(NETCONF_GET_CONFIG_QNAME, - wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())); + wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get(); val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME); return data?.findNode(path) as CompositeNode; } override readOperationalData(InstanceIdentifier path) { - val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())); + val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get(); val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME); return data?.findNode(path) as CompositeNode; } @@ -175,18 +219,9 @@ AutoCloseable { override getSupportedRpcs() { Collections.emptySet; } - - def createSubscription(String streamName) { - val it = ImmutableCompositeNode.builder() - QName = NETCONF_CREATE_SUBSCRIPTION_QNAME - addLeaf("stream",streamName); - invokeRpc(QName,toInstance()) - } override invokeRpc(QName rpc, CompositeNode input) { - val message = rpc.toRpcMessage(input); - val result = client.sendMessage(message, messegeRetryCount, messageTimeoutCount); - return result.toRpcResult(); + return listener.sendRequest(rpc.toRpcMessage(input,schemaContext)); } override getProviderFunctionality() { @@ -194,7 +229,7 @@ AutoCloseable { } override onSessionInitiated(ProviderSession session) { - val dataBroker = session.getService(DataBrokerService); + dataBroker = session.getService(DataBrokerService); val transaction = dataBroker.beginTransaction if (transaction.operationalNodeNotExisting) { @@ -221,7 +256,7 @@ AutoCloseable { return null === transaction.readOperationalData(path); } - def Node findNode(CompositeNode node, InstanceIdentifier identifier) { + static def Node findNode(CompositeNode node, InstanceIdentifier identifier) { var Node current = node; for (arg : identifier.path) { @@ -230,11 +265,16 @@ AutoCloseable { } else if (current instanceof CompositeNode) { val currentComposite = (current as CompositeNode); - current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision()); - if (current == null) { - current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision()); + current = currentComposite.getFirstCompositeByName(arg.nodeType); + if(current == null) { + current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision()); + } + if(current == null) { + current = currentComposite.getFirstSimpleByName(arg.nodeType); } if (current == null) { + current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision()); + } if (current == null) { return null; } } @@ -243,123 +283,35 @@ AutoCloseable { } override requestCommit(DataModification modification) { - throw new UnsupportedOperationException("TODO: auto-generated method stub") + val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true); + twoPhaseCommit.prepare() + return twoPhaseCommit; } - def getInitialCapabilities() { - val capabilities = client?.capabilities; - if (capabilities == null) { - return null; - } - if (cachedCapabilities == null) { - cachedCapabilities = FluentIterable.from(capabilities).filter[ + def getCapabilities(Collection capabilities) { + return FluentIterable.from(capabilities).filter[ contains("?") && contains("module=") && contains("revision=")].transform [ val parts = split("\\?"); val namespace = parts.get(0); val queryParams = FluentIterable.from(parts.get(1).split("&")); - val revision = queryParams.findFirst[startsWith("revision=")].replaceAll("revision=", ""); - val moduleName = queryParams.findFirst[startsWith("module=")].replaceAll("module=", ""); + var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", ""); + val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", ""); + if (revision === null) { + logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision="); + revision = queryParams.findFirst[startsWith("&revision=")]?.replaceAll("revision=", ""); + if (revision != null) { + logger.warn("Netconf device returned revision incorectly escaped for {}", it) + } + } + if (revision == null) { + return QName.create(URI.create(namespace), null, moduleName); + } return QName.create(namespace, revision, moduleName); ].toSet(); - } - return cachedCapabilities; } override close() { - confReaderReg?.close() - operReaderReg?.close() - client?.close() - } - -} - -package class NetconfDeviceListener extends NetconfClientSessionListener { - - val NetconfDevice device - val EventExecutor eventExecutor - - new(NetconfDevice device,EventExecutor eventExecutor) { - this.device = device - this.eventExecutor = eventExecutor - } - - var Promise messagePromise; - val promiseLock = new ReentrantLock; - - override onMessage(NetconfClientSession session, NetconfMessage message) { - if (isNotification(message)) { - onNotification(session, message); - } else try { - promiseLock.lock - if (messagePromise != null) { - messagePromise.setSuccess(message); - messagePromise = null; - } - } finally { - promiseLock.unlock - } - } - - /** - * Method intended to customize notification processing. - * - * @param session - * {@see - * NetconfClientSessionListener#onMessage(NetconfClientSession, - * NetconfMessage)} - * @param message - * {@see - * NetconfClientSessionListener#onMessage(NetconfClientSession, - * NetconfMessage)} - */ - def void onNotification(NetconfClientSession session, NetconfMessage message) { - device.logger.debug("Received NETCONF notification.",message); - val domNotification = message?.toCompositeNode?.notificationBody; - if(domNotification != null) { - device?.mountInstance?.publish(domNotification); - } - } - - private static def CompositeNode getNotificationBody(CompositeNode node) { - for(child : node.children) { - if(child instanceof CompositeNode) { - return child as CompositeNode; - } - } - } - - override getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException { - val promise = promiseReply(); - val messageAvailable = promise.await(attempts + attemptMsDelay); - if (messageAvailable) { - try { - return promise.get(); - } catch (ExecutionException e) { - throw new IllegalStateException(e); - } - } - - throw new IllegalStateException("Unsuccessful after " + attempts + " attempts."); - - // throw new TimeoutException("Message was not received on time."); - } - - def Promise promiseReply() { - promiseLock.lock - try { - if (messagePromise == null) { - messagePromise = eventExecutor.newPromise(); - return messagePromise; - } - return messagePromise; - } finally { - promiseLock.unlock - } - } - - def boolean isNotification(NetconfMessage message) { - val xmle = XmlElement.fromDomDocument(message.getDocument()); - return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()); + bringDown() } } @@ -377,25 +329,33 @@ package class NetconfDeviceSchemaContextProvider { new(NetconfDevice device, SchemaSourceProvider sourceProvider) { _device = device _sourceProvider = sourceProvider + _currentContext = Optional.absent(); } def createContextFromCapabilities(Iterable capabilities) { - - val modelsToParse = ImmutableMap.builder(); - for (cap : capabilities) { - val source = sourceProvider.getSchemaSource(cap.localName, Optional.fromNullable(cap.formattedRevision)); - if (source.present) { - modelsToParse.put(cap, source.get()); - } + val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider) + if (!sourceContext.missingSources.empty) { + device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources); + } + device.logger.debug("Trying to create schema context from {}", sourceContext.validSources) + val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext); + if (!sourceContext.validSources.empty) { + val schemaContext = tryToCreateContext(modelsToParse); + currentContext = Optional.fromNullable(schemaContext); + } else { + currentContext = Optional.absent(); } - val context = tryToCreateContext(modelsToParse.build); - currentContext = Optional.fromNullable(context); + if (currentContext.present) { + device.logger.debug("Schema context successfully created."); + } + } - def SchemaContext tryToCreateContext(Map modelsToParse) { + def SchemaContext tryToCreateContext(List modelsToParse) { val parser = new YangParserImpl(); try { - val models = parser.parseYangModelsFromStreams(ImmutableList.copyOf(modelsToParse.values)); + + val models = parser.parseYangModelsFromStreams(modelsToParse); val result = parser.resolveSchemaContext(models); return result; } catch (Exception e) { @@ -404,33 +364,3 @@ package class NetconfDeviceSchemaContextProvider { } } } - -package class NetconfDeviceSchemaSourceProvider implements SchemaSourceProvider { - - val NetconfDevice device; - - new(NetconfDevice device) { - this.device = device; - } - - override getSchemaSource(String moduleName, Optional revision) { - val it = ImmutableCompositeNode.builder() // - setQName(QName::create(NetconfState.QNAME, "get-schema")) // - addLeaf("format", "yang") - addLeaf("identifier", moduleName) - if (revision.present) { - addLeaf("version", revision.get()) - } - - device.logger.info("Loading YANG schema source for {}:{}", moduleName, revision) - val schemaReply = device.invokeRpc(getQName(), toInstance()); - - if (schemaReply.successful) { - val schemaBody = schemaReply.result.getFirstSimpleByName( - QName::create(NetconfState.QNAME.namespace, null, "data"))?.value; - device.logger.info("YANG Schema successfully received for: {}:{}", moduleName, revision); - return Optional.of(schemaBody as String); - } - return Optional.absent(); - } -}