2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connect.netconf
10 import com.google.common.base.Optional
11 import com.google.common.collect.FluentIterable
12 import io.netty.util.concurrent.EventExecutor
13 import java.io.InputStream
14 import java.net.InetSocketAddress
16 import java.util.ArrayList
17 import java.util.Collection
18 import java.util.Collections
21 import java.util.concurrent.ExecutorService
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
23 import org.opendaylight.controller.md.sal.common.api.data.DataModification
24 import org.opendaylight.controller.md.sal.common.api.data.DataReader
25 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
26 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
27 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
28 import org.opendaylight.controller.sal.core.api.Provider
29 import org.opendaylight.controller.sal.core.api.RpcImplementation
30 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
31 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
32 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
33 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
34 import org.opendaylight.protocol.framework.ReconnectStrategy
35 import org.opendaylight.yangtools.concepts.Registration
36 import org.opendaylight.yangtools.yang.common.QName
37 import org.opendaylight.yangtools.yang.data.api.CompositeNode
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
39 import org.opendaylight.yangtools.yang.data.api.Node
40 import org.opendaylight.yangtools.yang.data.api.SimpleNode
41 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
42 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
43 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext
45 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
46 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
47 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
48 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
49 import org.slf4j.Logger
50 import org.slf4j.LoggerFactory
52 import static com.google.common.base.Preconditions.*
53 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
55 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
57 class NetconfDevice implements Provider, //
58 DataReader<InstanceIdentifier, CompositeNode>, //
59 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
64 var InetSocketAddress socketAddress;
67 var MountProvisionInstance mountInstance;
70 var EventExecutor eventExecutor;
73 var ExecutorService processingExecutor;
76 var InstanceIdentifier path;
79 var ReconnectStrategy reconnectStrategy;
82 var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
85 private NetconfDeviceSchemaContextProvider deviceContextProvider
87 protected val Logger logger
89 Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
90 Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
91 Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
92 List<RpcRegistration> rpcReg
97 MountProvisionService mountService
100 var NetconfClientDispatcher dispatcher
102 static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
105 var SchemaSourceProvider<InputStream> remoteSourceProvider
107 DataBrokerService dataBroker
109 var NetconfDeviceListener listener;
111 public new(String name) {
113 this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
114 this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
115 Collections.singletonMap(INVENTORY_ID, name)).toInstance;
119 checkState(dispatcher != null, "Dispatcher must be set.");
120 checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
121 checkState(eventExecutor != null, "Event executor must be set.");
123 listener = new NetconfDeviceListener(this);
125 logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
127 dispatcher.createClient(socketAddress, listener, reconnectStrategy);
130 def Optional<SchemaContext> getSchemaContext() {
131 if (deviceContextProvider == null) {
132 return Optional.absent();
134 return deviceContextProvider.currentContext;
138 if (rpcReg != null) {
144 confReaderReg?.close()
146 operReaderReg?.close()
148 commitHandlerReg?.close()
149 commitHandlerReg = null
151 updateDeviceState(false, Collections.emptySet())
154 def bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
155 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
156 deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
157 deviceContextProvider.createContextFromCapabilities(capabilities);
158 if (mountInstance != null && schemaContext.isPresent) {
159 mountInstance.schemaContext = schemaContext.get();
162 updateDeviceState(true, capabilities)
164 if (mountInstance != null) {
165 confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
166 operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
167 commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
169 val rpcs = new ArrayList<RpcRegistration>();
170 for (rpc : mountInstance.schemaContext.operations) {
171 rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this));
177 private def updateDeviceState(boolean up, Set<QName> capabilities) {
178 val transaction = dataBroker.beginTransaction
180 val it = ImmutableCompositeNode.builder
181 setQName(INVENTORY_NODE)
182 addLeaf(INVENTORY_ID, name)
183 addLeaf(INVENTORY_CONNECTED, up)
185 logger.debug("Client capabilities {}", capabilities)
186 for (capability : capabilities) {
187 addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
190 logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
191 transaction.removeOperationalData(path)
192 transaction.putOperationalData(path, it.toInstance)
193 logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
195 // FIXME: this has to be asynchronous
196 val transactionStatus = transaction.commit.get;
198 if (transactionStatus.successful) {
199 logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
201 logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
202 logger.debug("Update device state transaction status " + transaction.status)
206 override readConfigurationData(InstanceIdentifier path) {
207 val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
208 wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
209 val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
210 return data?.findNode(path) as CompositeNode;
213 override readOperationalData(InstanceIdentifier path) {
214 val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
215 val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
216 return data?.findNode(path) as CompositeNode;
219 override getSupportedRpcs() {
220 Collections.emptySet;
223 override invokeRpc(QName rpc, CompositeNode input) {
224 return listener.sendRequest(rpc.toRpcMessage(input,schemaContext));
227 override getProviderFunctionality() {
231 override onSessionInitiated(ProviderSession session) {
232 dataBroker = session.getService(DataBrokerService);
234 val transaction = dataBroker.beginTransaction
235 if (transaction.operationalNodeNotExisting) {
236 transaction.putOperationalData(path, nodeWithId)
238 if (transaction.configurationNodeNotExisting) {
239 transaction.putConfigurationData(path, nodeWithId)
241 transaction.commit().get();
242 mountService = session.getService(MountProvisionService);
243 mountInstance = mountService?.createOrGetMountPoint(path);
246 def getNodeWithId() {
247 val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
248 return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
251 def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
252 return null === transaction.readConfigurationData(path);
255 def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
256 return null === transaction.readOperationalData(path);
259 static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
261 var Node<?> current = node;
262 for (arg : identifier.path) {
263 if (current instanceof SimpleNode<?>) {
265 } else if (current instanceof CompositeNode) {
266 val currentComposite = (current as CompositeNode);
268 current = currentComposite.getFirstCompositeByName(arg.nodeType);
269 if(current == null) {
270 current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
272 if(current == null) {
273 current = currentComposite.getFirstSimpleByName(arg.nodeType);
275 if (current == null) {
276 current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
277 } if (current == null) {
285 override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
286 val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true);
287 twoPhaseCommit.prepare()
288 return twoPhaseCommit;
291 def getCapabilities(Collection<String> capabilities) {
292 return FluentIterable.from(capabilities).filter[
293 contains("?") && contains("module=") && contains("revision=")].transform [
294 val parts = split("\\?");
295 val namespace = parts.get(0);
296 val queryParams = FluentIterable.from(parts.get(1).split("&"));
297 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
298 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
299 if (revision === null) {
300 logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
301 revision = queryParams.findFirst[startsWith("&revision=")]?.replaceAll("revision=", "");
302 if (revision != null) {
303 logger.warn("Netconf device returned revision incorectly escaped for {}", it)
306 if (revision == null) {
307 return QName.create(URI.create(namespace), null, moduleName);
309 return QName.create(namespace, revision, moduleName);
318 package class NetconfDeviceSchemaContextProvider {
321 val NetconfDevice device;
324 val SchemaSourceProvider<InputStream> sourceProvider;
327 var Optional<SchemaContext> currentContext;
329 new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
331 _sourceProvider = sourceProvider
332 _currentContext = Optional.absent();
335 def createContextFromCapabilities(Iterable<QName> capabilities) {
336 val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
337 if (!sourceContext.missingSources.empty) {
338 device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
340 device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
341 val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
342 if (!sourceContext.validSources.empty) {
343 val schemaContext = tryToCreateContext(modelsToParse);
344 currentContext = Optional.fromNullable(schemaContext);
346 currentContext = Optional.absent();
348 if (currentContext.present) {
349 device.logger.debug("Schema context successfully created.");
354 def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
355 val parser = new YangParserImpl();
358 val models = parser.parseYangModelsFromStreams(modelsToParse);
359 val result = parser.resolveSchemaContext(models);
361 } catch (Exception e) {
362 device.logger.debug("Error occured during parsing YANG schemas", e);