2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connect.netconf
10 import com.google.common.base.Optional
11 import com.google.common.collect.FluentIterable
12 import io.netty.util.concurrent.EventExecutor
13 import java.io.InputStream
14 import java.net.InetSocketAddress
16 import java.util.ArrayList
17 import java.util.Collection
18 import java.util.Collections
21 import java.util.concurrent.ExecutorService
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
23 import org.opendaylight.controller.md.sal.common.api.data.DataModification
24 import org.opendaylight.controller.md.sal.common.api.data.DataReader
25 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
26 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
27 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
28 import org.opendaylight.controller.sal.core.api.Provider
29 import org.opendaylight.controller.sal.core.api.RpcImplementation
30 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
31 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
32 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
33 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
34 import org.opendaylight.protocol.framework.ReconnectStrategy
35 import org.opendaylight.yangtools.concepts.Registration
36 import org.opendaylight.yangtools.yang.common.QName
37 import org.opendaylight.yangtools.yang.data.api.CompositeNode
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
39 import org.opendaylight.yangtools.yang.data.api.Node
40 import org.opendaylight.yangtools.yang.data.api.SimpleNode
41 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
42 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
43 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext
45 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
46 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
47 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
48 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
49 import org.slf4j.Logger
50 import org.slf4j.LoggerFactory
52 import static com.google.common.base.Preconditions.*
53 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
55 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
57 class NetconfDevice implements Provider, //
58 DataReader<InstanceIdentifier, CompositeNode>, //
59 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
64 var InetSocketAddress socketAddress;
67 var MountProvisionInstance mountInstance;
70 var EventExecutor eventExecutor;
73 var ExecutorService processingExecutor;
76 var InstanceIdentifier path;
79 var ReconnectStrategy reconnectStrategy;
82 var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
85 private NetconfDeviceSchemaContextProvider deviceContextProvider
87 protected val Logger logger
89 Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
90 Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
91 Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
92 List<RpcRegistration> rpcReg
97 MountProvisionService mountService
100 var NetconfClientDispatcher dispatcher
102 static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
105 var SchemaSourceProvider<InputStream> remoteSourceProvider
107 DataBrokerService dataBroker
109 var NetconfDeviceListener listener;
111 public new(String name) {
113 this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
114 this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
115 Collections.singletonMap(INVENTORY_ID, name)).toInstance;
119 checkState(dispatcher != null, "Dispatcher must be set.");
120 checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
121 checkState(eventExecutor != null, "Event executor must be set.");
123 listener = new NetconfDeviceListener(this);
125 logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
127 dispatcher.createClient(socketAddress, listener, reconnectStrategy);
130 def Optional<SchemaContext> getSchemaContext() {
131 if (deviceContextProvider == null) {
132 return Optional.absent();
134 return deviceContextProvider.currentContext;
138 if (rpcReg != null) {
144 confReaderReg?.close()
146 operReaderReg?.close()
148 commitHandlerReg?.close()
149 commitHandlerReg = null
151 updateDeviceState(false, Collections.emptySet())
154 def bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
155 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
156 deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
157 deviceContextProvider.createContextFromCapabilities(capabilities);
158 if (mountInstance != null && schemaContext.isPresent) {
159 mountInstance.schemaContext = schemaContext.get();
162 updateDeviceState(true, capabilities)
164 if (mountInstance != null) {
165 confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
166 operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
167 commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
169 val rpcs = new ArrayList<RpcRegistration>();
170 if (mountInstance != null && schemaContext.isPresent) {
171 for (rpc : mountInstance.schemaContext.operations) {
172 rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this));
179 private def updateDeviceState(boolean up, Set<QName> capabilities) {
180 val transaction = dataBroker.beginTransaction
182 val it = ImmutableCompositeNode.builder
183 setQName(INVENTORY_NODE)
184 addLeaf(INVENTORY_ID, name)
185 addLeaf(INVENTORY_CONNECTED, up)
187 logger.debug("Client capabilities {}", capabilities)
188 for (capability : capabilities) {
189 addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
192 logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
193 transaction.removeOperationalData(path)
194 transaction.putOperationalData(path, it.toInstance)
195 logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
197 // FIXME: this has to be asynchronous
198 val transactionStatus = transaction.commit.get;
200 if (transactionStatus.successful) {
201 logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
203 logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
204 logger.debug("Update device state transaction status " + transaction.status)
208 override readConfigurationData(InstanceIdentifier path) {
209 val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
210 wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
211 val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
212 return data?.findNode(path) as CompositeNode;
215 override readOperationalData(InstanceIdentifier path) {
216 val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
217 val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
218 return data?.findNode(path) as CompositeNode;
221 override getSupportedRpcs() {
222 Collections.emptySet;
225 override invokeRpc(QName rpc, CompositeNode input) {
226 return listener.sendRequest(rpc.toRpcMessage(input,schemaContext));
229 override getProviderFunctionality() {
233 override onSessionInitiated(ProviderSession session) {
234 dataBroker = session.getService(DataBrokerService);
236 val transaction = dataBroker.beginTransaction
237 if (transaction.operationalNodeNotExisting) {
238 transaction.putOperationalData(path, nodeWithId)
240 if (transaction.configurationNodeNotExisting) {
241 transaction.putConfigurationData(path, nodeWithId)
243 transaction.commit().get();
244 mountService = session.getService(MountProvisionService);
245 mountInstance = mountService?.createOrGetMountPoint(path);
248 def getNodeWithId() {
249 val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
250 return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
253 def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
254 return null === transaction.readConfigurationData(path);
257 def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
258 return null === transaction.readOperationalData(path);
261 static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
263 var Node<?> current = node;
264 for (arg : identifier.path) {
265 if (current instanceof SimpleNode<?>) {
267 } else if (current instanceof CompositeNode) {
268 val currentComposite = (current as CompositeNode);
270 current = currentComposite.getFirstCompositeByName(arg.nodeType);
271 if(current == null) {
272 current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
274 if(current == null) {
275 current = currentComposite.getFirstSimpleByName(arg.nodeType);
277 if (current == null) {
278 current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
279 } if (current == null) {
287 override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
288 val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true);
289 twoPhaseCommit.prepare()
290 return twoPhaseCommit;
293 def getCapabilities(Collection<String> capabilities) {
294 return FluentIterable.from(capabilities).filter[
295 contains("?") && contains("module=") && contains("revision=")].transform [
296 val parts = split("\\?");
297 val namespace = parts.get(0);
298 val queryParams = FluentIterable.from(parts.get(1).split("&"));
299 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
300 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
301 if (revision === null) {
302 logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
303 revision = queryParams.findFirst[startsWith("&revision=")]?.replaceAll("revision=", "");
304 if (revision != null) {
305 logger.warn("Netconf device returned revision incorectly escaped for {}", it)
308 if (revision == null) {
309 return QName.create(URI.create(namespace), null, moduleName);
311 return QName.create(namespace, revision, moduleName);
320 package class NetconfDeviceSchemaContextProvider {
323 val NetconfDevice device;
326 val SchemaSourceProvider<InputStream> sourceProvider;
329 var Optional<SchemaContext> currentContext;
331 new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
333 _sourceProvider = sourceProvider
334 _currentContext = Optional.absent();
337 def createContextFromCapabilities(Iterable<QName> capabilities) {
338 val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
339 if (!sourceContext.missingSources.empty) {
340 device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
342 device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
343 val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
344 if (!sourceContext.validSources.empty) {
345 val schemaContext = tryToCreateContext(modelsToParse);
346 currentContext = Optional.fromNullable(schemaContext);
348 currentContext = Optional.absent();
350 if (currentContext.present) {
351 device.logger.debug("Schema context successfully created.");
356 def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
357 val parser = new YangParserImpl();
360 val models = parser.parseYangModelsFromStreams(modelsToParse);
361 val result = parser.resolveSchemaContext(models);
363 } catch (Exception e) {
364 device.logger.debug("Error occured during parsing YANG schemas", e);