X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.xtend;h=12e8b5587caa230cb693610d21f8135f61e9e2e9;hp=3799dd245e2eaf96921e4df3dd1b29b531232750;hb=408eeef51f435abd2027f9d25ac5592066b202dd;hpb=f35e990600d56b1d524cc9d9cfc44b725199b1a6 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend index 3799dd245e..12e8b5587c 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend @@ -1,3 +1,10 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.connect.netconf import com.google.common.base.Optional @@ -6,19 +13,18 @@ import io.netty.util.concurrent.EventExecutor import java.io.InputStream import java.net.InetSocketAddress import java.net.URI +import java.util.ArrayList +import java.util.Collection import java.util.Collections import java.util.List import java.util.Set import java.util.concurrent.ExecutorService -import java.util.concurrent.Future import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler import org.opendaylight.controller.md.sal.common.api.data.DataModification import org.opendaylight.controller.md.sal.common.api.data.DataReader -import org.opendaylight.controller.netconf.api.NetconfMessage -import org.opendaylight.controller.netconf.client.NetconfClient import org.opendaylight.controller.netconf.client.NetconfClientDispatcher -import org.opendaylight.controller.netconf.util.xml.XmlUtil import org.opendaylight.controller.sal.core.api.Broker.ProviderSession +import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration import org.opendaylight.controller.sal.core.api.Provider import org.opendaylight.controller.sal.core.api.RpcImplementation import org.opendaylight.controller.sal.core.api.data.DataBrokerService @@ -38,7 +44,6 @@ import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl import org.opendaylight.yangtools.yang.model.api.SchemaContext import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider -import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext import org.slf4j.Logger @@ -49,14 +54,12 @@ import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.* import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.* -class NetconfDevice implements Provider, // +class NetconfDevice implements Provider, // DataReader, // DataCommitHandler, // RpcImplementation, // AutoCloseable { - var NetconfClient client; - @Property var InetSocketAddress socketAddress; @@ -86,15 +89,12 @@ AutoCloseable { Registration> operReaderReg Registration> confReaderReg Registration> commitHandlerReg + List rpcReg + @Property val String name - MountProvisionService mountService - int messegeRetryCount = 5; - - int messageTimeoutCount = 5 * 1000; - - Set cachedCapabilities + MountProvisionService mountService @Property var NetconfClientDispatcher dispatcher @@ -103,11 +103,13 @@ AutoCloseable { @Property var SchemaSourceProvider remoteSourceProvider - + DataBrokerService dataBroker + var NetconfDeviceListener listener; + public new(String name) { - this.name = name; + this._name = name; this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name); this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE, Collections.singletonMap(INVENTORY_ID, name)).toInstance; @@ -118,14 +120,11 @@ AutoCloseable { checkState(schemaSourceProvider != null, "Schema Source Provider must be set.") checkState(eventExecutor != null, "Event executor must be set."); - val listener = new NetconfDeviceListener(this, eventExecutor); - val task = startClientTask(dispatcher, listener) - if (mountInstance != null) { - commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this) - } - return processingExecutor.submit(task) as Future; + listener = new NetconfDeviceListener(this); + + logger.info("Starting NETCONF Client {} for address {}", name, socketAddress); - //commitHandlerReg = mountInstance.registerCommitHandler(path,this); + dispatcher.createClient(socketAddress, listener, reconnectStrategy); } def Optional getSchemaContext() { @@ -135,54 +134,65 @@ AutoCloseable { return deviceContextProvider.currentContext; } - private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) { - return [ | - try { - logger.info("Starting Netconf Client on: {}", socketAddress); - client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener); - logger.debug("Initial capabilities {}", initialCapabilities); - var SchemaSourceProvider delegate; - if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) { - delegate = new NetconfRemoteSchemaSourceProvider(this); - } else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) { - delegate = new NetconfRemoteSchemaSourceProvider(this); - } else { - logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress); - delegate = SchemaSourceProviders.noopProvider(); - } - remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); - deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider); - deviceContextProvider.createContextFromCapabilities(initialCapabilities); - if (mountInstance != null && schemaContext.isPresent) { - mountInstance.schemaContext = schemaContext.get(); - } - updateDeviceState() - if (mountInstance != null && confReaderReg == null && operReaderReg == null) { - confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); - operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); - } - } catch (Exception e) { - logger.error("Netconf client NOT started. ", e) + def bringDown() { + if (rpcReg != null) { + for (reg : rpcReg) { + reg.close() } - ] + rpcReg = null + } + confReaderReg?.close() + confReaderReg = null + operReaderReg?.close() + operReaderReg = null + commitHandlerReg?.close() + commitHandlerReg = null + + updateDeviceState(false, Collections.emptySet()) } - private def updateDeviceState() { + def bringUp(SchemaSourceProvider delegate, Set capabilities) { + remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); + deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider); + deviceContextProvider.createContextFromCapabilities(capabilities); + if (mountInstance != null && schemaContext.isPresent) { + mountInstance.schemaContext = schemaContext.get(); + } + + updateDeviceState(true, capabilities) + + if (mountInstance != null) { + confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); + operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); + commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this); + + val rpcs = new ArrayList(); + for (rpc : mountInstance.schemaContext.operations) { + rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this)); + } + rpcReg = rpcs + } + } + + private def updateDeviceState(boolean up, Set capabilities) { val transaction = dataBroker.beginTransaction val it = ImmutableCompositeNode.builder setQName(INVENTORY_NODE) addLeaf(INVENTORY_ID, name) - addLeaf(INVENTORY_CONNECTED, client.clientSession.up) + addLeaf(INVENTORY_CONNECTED, up) - logger.debug("Client capabilities {}", client.capabilities) - for (capability : client.capabilities) { + logger.debug("Client capabilities {}", capabilities) + for (capability : capabilities) { addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability) } logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.") + transaction.removeOperationalData(path) transaction.putOperationalData(path, it.toInstance) logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.") + + // FIXME: this has to be asynchronous val transactionStatus = transaction.commit.get; if (transactionStatus.successful) { @@ -195,13 +205,13 @@ AutoCloseable { override readConfigurationData(InstanceIdentifier path) { val result = invokeRpc(NETCONF_GET_CONFIG_QNAME, - wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())); + wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get(); val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME); return data?.findNode(path) as CompositeNode; } override readOperationalData(InstanceIdentifier path) { - val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())); + val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get(); val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME); return data?.findNode(path) as CompositeNode; } @@ -210,30 +220,8 @@ AutoCloseable { Collections.emptySet; } - def createSubscription(String streamName) { - val it = ImmutableCompositeNode.builder() - QName = NETCONF_CREATE_SUBSCRIPTION_QNAME - addLeaf("stream", streamName); - invokeRpc(QName, toInstance()) - } - override invokeRpc(QName rpc, CompositeNode input) { - try { - val message = rpc.toRpcMessage(input,schemaContext); - val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount); - return result.toRpcResult(rpc, schemaContext); - - } catch (Exception e) { - logger.error("Rpc was not processed correctly.", e) - throw e; - } - } - - def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) { - logger.debug("Send message {}",XmlUtil.toString(message.document)) - val result = client.sendMessage(message, retryCount, timeout); - NetconfMapping.checkValidReply(message, result) - return result; + return listener.sendRequest(rpc.toRpcMessage(input,schemaContext)); } override getProviderFunctionality() { @@ -276,7 +264,7 @@ AutoCloseable { return null; } else if (current instanceof CompositeNode) { val currentComposite = (current as CompositeNode); - + current = currentComposite.getFirstCompositeByName(arg.nodeType); if(current == null) { current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision()); @@ -295,18 +283,13 @@ AutoCloseable { } override requestCommit(DataModification modification) { - val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification); + val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true); twoPhaseCommit.prepare() return twoPhaseCommit; } - def getInitialCapabilities() { - val capabilities = client?.capabilities; - if (capabilities == null) { - return null; - } - if (cachedCapabilities == null) { - cachedCapabilities = FluentIterable.from(capabilities).filter[ + def getCapabilities(Collection capabilities) { + return FluentIterable.from(capabilities).filter[ contains("?") && contains("module=") && contains("revision=")].transform [ val parts = split("\\?"); val namespace = parts.get(0); @@ -325,16 +308,11 @@ AutoCloseable { } return QName.create(namespace, revision, moduleName); ].toSet(); - } - return cachedCapabilities; } override close() { - confReaderReg?.close() - operReaderReg?.close() - client?.close() + bringDown() } - } package class NetconfDeviceSchemaContextProvider {