X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.xtend;h=3eb0472b5c5bf3a4b1079e731875529f072da7d0;hp=21500e1da6441540344daf741dd6becec70ed5a6;hb=bcdc6138d215d097b13510e08735808ed931aeda;hpb=d71e327e51db32e967f7ebcb186e148f37f28117 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend index 21500e1da6..3eb0472b5c 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend @@ -1,3 +1,10 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.connect.netconf import com.google.common.base.Optional @@ -17,6 +24,7 @@ import org.opendaylight.controller.md.sal.common.api.data.DataReader import org.opendaylight.controller.netconf.api.NetconfMessage import org.opendaylight.controller.netconf.client.NetconfClient import org.opendaylight.controller.netconf.client.NetconfClientDispatcher +import org.opendaylight.controller.netconf.util.xml.XmlUtil import org.opendaylight.controller.sal.core.api.Broker.ProviderSession import org.opendaylight.controller.sal.core.api.Provider import org.opendaylight.controller.sal.core.api.RpcImplementation @@ -25,7 +33,6 @@ import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance import org.opendaylight.controller.sal.core.api.mount.MountProvisionService import org.opendaylight.protocol.framework.ReconnectStrategy -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState import org.opendaylight.yangtools.concepts.Registration import org.opendaylight.yangtools.yang.common.QName import org.opendaylight.yangtools.yang.data.api.CompositeNode @@ -39,7 +46,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders -import org.opendaylight.yangtools.yang.model.util.repo.SourceIdentifier import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext import org.slf4j.Logger @@ -49,7 +55,7 @@ import static com.google.common.base.Preconditions.* import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.* import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.* -import org.opendaylight.controller.netconf.util.xml.XmlUtil +import com.google.common.util.concurrent.Futures class NetconfDevice implements Provider, // DataReader, // @@ -105,6 +111,8 @@ AutoCloseable { @Property var SchemaSourceProvider remoteSourceProvider + + DataBrokerService dataBroker public new(String name) { this.name = name; @@ -118,16 +126,10 @@ AutoCloseable { checkState(schemaSourceProvider != null, "Schema Source Provider must be set.") checkState(eventExecutor != null, "Event executor must be set."); - val listener = new NetconfDeviceListener(this, eventExecutor); + val listener = new NetconfDeviceListener(this); val task = startClientTask(dispatcher, listener) - if (mountInstance != null) { - confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); - operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); - commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this) - } return processingExecutor.submit(task) as Future; - //commitHandlerReg = mountInstance.registerCommitHandler(path,this); } def Optional getSchemaContext() { @@ -138,38 +140,77 @@ AutoCloseable { } private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) { - return [ | - logger.info("Starting Netconf Client on: {}", socketAddress); - client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener); - logger.debug("Initial capabilities {}", initialCapabilities); - var SchemaSourceProvider delegate; - if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) { - delegate = new NetconfRemoteSchemaSourceProvider(this); - } else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) { - delegate = new NetconfRemoteSchemaSourceProvider(this); - } else { - logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress); - delegate = SchemaSourceProviders.noopProvider(); - } - remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); - deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider); - deviceContextProvider.createContextFromCapabilities(initialCapabilities); - if (mountInstance != null && schemaContext.isPresent) { - mountInstance.schemaContext = schemaContext.get(); + try { + logger.info("Starting Netconf Client on: {}", socketAddress); + client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener); + logger.debug("Initial capabilities {}", initialCapabilities); + var SchemaSourceProvider delegate; + if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) { + delegate = new NetconfRemoteSchemaSourceProvider(this); + } else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) { + delegate = new NetconfRemoteSchemaSourceProvider(this); + } else { + logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress); + delegate = SchemaSourceProviders.noopProvider(); + } + remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); + deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider); + deviceContextProvider.createContextFromCapabilities(initialCapabilities); + if (mountInstance != null && schemaContext.isPresent) { + mountInstance.schemaContext = schemaContext.get(); + val operations = schemaContext.get().operations; + for (rpc : operations) { + mountInstance.addRpcImplementation(rpc.QName, this); + } + } + updateDeviceState() + if (mountInstance != null && confReaderReg == null && operReaderReg == null) { + confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); + operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); + commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this); + } + } catch (Exception e) { + logger.error("Netconf client NOT started. ", e) } ] } + private def updateDeviceState() { + val transaction = dataBroker.beginTransaction + + val it = ImmutableCompositeNode.builder + setQName(INVENTORY_NODE) + addLeaf(INVENTORY_ID, name) + addLeaf(INVENTORY_CONNECTED, client.clientSession.up) + + logger.debug("Client capabilities {}", client.capabilities) + for (capability : client.capabilities) { + addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability) + } + + logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.") + transaction.putOperationalData(path, it.toInstance) + logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.") + val transactionStatus = transaction.commit.get; + + if (transactionStatus.successful) { + logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.") + } else { + logger.debug("Update device state transaction " + transaction.identifier + " FAILED!") + logger.debug("Update device state transaction status " + transaction.status) + } + } + override readConfigurationData(InstanceIdentifier path) { val result = invokeRpc(NETCONF_GET_CONFIG_QNAME, - wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())); + wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get(); val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME); return data?.findNode(path) as CompositeNode; } override readOperationalData(InstanceIdentifier path) { - val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())); + val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get(); val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME); return data?.findNode(path) as CompositeNode; } @@ -178,19 +219,18 @@ AutoCloseable { Collections.emptySet; } - def createSubscription(String streamName) { - val it = ImmutableCompositeNode.builder() - QName = NETCONF_CREATE_SUBSCRIPTION_QNAME - addLeaf("stream", streamName); - invokeRpc(QName, toInstance()) - } +// def createSubscription(String streamName) { +// val it = ImmutableCompositeNode.builder() +// QName = NETCONF_CREATE_SUBSCRIPTION_QNAME +// addLeaf("stream", streamName); +// invokeRpc(QName, toInstance()) +// } override invokeRpc(QName rpc, CompositeNode input) { try { val message = rpc.toRpcMessage(input,schemaContext); val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount); - return result.toRpcResult(rpc, schemaContext); - + return Futures.immediateFuture(result.toRpcResult(rpc, schemaContext)); } catch (Exception e) { logger.error("Rpc was not processed correctly.", e) throw e; @@ -209,7 +249,7 @@ AutoCloseable { } override onSessionInitiated(ProviderSession session) { - val dataBroker = session.getService(DataBrokerService); + dataBroker = session.getService(DataBrokerService); val transaction = dataBroker.beginTransaction if (transaction.operationalNodeNotExisting) { @@ -302,7 +342,6 @@ AutoCloseable { operReaderReg?.close() client?.close() } - } package class NetconfDeviceSchemaContextProvider {