2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connect.netconf
10 import com.google.common.base.Optional
11 import com.google.common.collect.FluentIterable
12 import io.netty.util.concurrent.EventExecutor
13 import java.io.InputStream
14 import java.net.InetSocketAddress
16 import java.util.Collections
19 import java.util.concurrent.ExecutorService
20 import java.util.concurrent.Future
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification
23 import org.opendaylight.controller.md.sal.common.api.data.DataReader
24 import org.opendaylight.controller.netconf.api.NetconfMessage
25 import org.opendaylight.controller.netconf.client.NetconfClient
26 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
27 import org.opendaylight.controller.netconf.util.xml.XmlUtil
28 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
29 import org.opendaylight.controller.sal.core.api.Provider
30 import org.opendaylight.controller.sal.core.api.RpcImplementation
31 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
32 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
33 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
34 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
35 import org.opendaylight.protocol.framework.ReconnectStrategy
36 import org.opendaylight.yangtools.concepts.Registration
37 import org.opendaylight.yangtools.yang.common.QName
38 import org.opendaylight.yangtools.yang.data.api.CompositeNode
39 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
40 import org.opendaylight.yangtools.yang.data.api.Node
41 import org.opendaylight.yangtools.yang.data.api.SimpleNode
42 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
43 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
44 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext
46 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
47 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
48 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
49 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
50 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
51 import org.slf4j.Logger
52 import org.slf4j.LoggerFactory
54 import static com.google.common.base.Preconditions.*
55 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
57 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
58 import com.google.common.util.concurrent.Futures
60 class NetconfDevice implements Provider, //
61 DataReader<InstanceIdentifier, CompositeNode>, //
62 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
66 var NetconfClient client;
69 var InetSocketAddress socketAddress;
72 var MountProvisionInstance mountInstance;
75 var EventExecutor eventExecutor;
78 var ExecutorService processingExecutor;
81 var InstanceIdentifier path;
84 var ReconnectStrategy reconnectStrategy;
87 var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
90 private NetconfDeviceSchemaContextProvider deviceContextProvider
92 protected val Logger logger
94 Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
95 Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
96 Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
99 MountProvisionService mountService
101 int messegeRetryCount = 5;
103 int messageTimeoutCount = 5 * 1000;
105 Set<QName> cachedCapabilities
108 var NetconfClientDispatcher dispatcher
110 static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
113 var SchemaSourceProvider<InputStream> remoteSourceProvider
115 DataBrokerService dataBroker
117 public new(String name) {
119 this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
120 this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
121 Collections.singletonMap(INVENTORY_ID, name)).toInstance;
125 checkState(dispatcher != null, "Dispatcher must be set.");
126 checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
127 checkState(eventExecutor != null, "Event executor must be set.");
129 val listener = new NetconfDeviceListener(this);
130 val task = startClientTask(dispatcher, listener)
131 return processingExecutor.submit(task) as Future<Void>;
135 def Optional<SchemaContext> getSchemaContext() {
136 if (deviceContextProvider == null) {
137 return Optional.absent();
139 return deviceContextProvider.currentContext;
142 private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
145 logger.info("Starting Netconf Client on: {}", socketAddress);
146 client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
147 logger.debug("Initial capabilities {}", initialCapabilities);
148 var SchemaSourceProvider<String> delegate;
149 if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
150 delegate = new NetconfRemoteSchemaSourceProvider(this);
151 } else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
152 delegate = new NetconfRemoteSchemaSourceProvider(this);
154 logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
155 delegate = SchemaSourceProviders.<String>noopProvider();
157 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
158 deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
159 deviceContextProvider.createContextFromCapabilities(initialCapabilities);
160 if (mountInstance != null && schemaContext.isPresent) {
161 mountInstance.schemaContext = schemaContext.get();
162 val operations = schemaContext.get().operations;
163 for (rpc : operations) {
164 mountInstance.addRpcImplementation(rpc.QName, this);
168 if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
169 confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
170 operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
171 commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
173 } catch (Exception e) {
174 logger.error("Netconf client NOT started. ", e)
179 private def updateDeviceState() {
180 val transaction = dataBroker.beginTransaction
182 val it = ImmutableCompositeNode.builder
183 setQName(INVENTORY_NODE)
184 addLeaf(INVENTORY_ID, name)
185 addLeaf(INVENTORY_CONNECTED, client.clientSession.up)
187 logger.debug("Client capabilities {}", client.capabilities)
188 for (capability : client.capabilities) {
189 addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
192 logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
193 transaction.putOperationalData(path, it.toInstance)
194 logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
195 val transactionStatus = transaction.commit.get;
197 if (transactionStatus.successful) {
198 logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
200 logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
201 logger.debug("Update device state transaction status " + transaction.status)
205 override readConfigurationData(InstanceIdentifier path) {
206 val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
207 wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
208 val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
209 return data?.findNode(path) as CompositeNode;
212 override readOperationalData(InstanceIdentifier path) {
213 val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
214 val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
215 return data?.findNode(path) as CompositeNode;
218 override getSupportedRpcs() {
219 Collections.emptySet;
222 // def createSubscription(String streamName) {
223 // val it = ImmutableCompositeNode.builder()
224 // QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
225 // addLeaf("stream", streamName);
226 // invokeRpc(QName, toInstance())
229 override invokeRpc(QName rpc, CompositeNode input) {
231 val message = rpc.toRpcMessage(input,schemaContext);
232 val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
233 return Futures.immediateFuture(result.toRpcResult(rpc, schemaContext));
234 } catch (Exception e) {
235 logger.error("Rpc was not processed correctly.", e)
240 def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
241 logger.debug("Send message {}",XmlUtil.toString(message.document))
242 val result = client.sendMessage(message, retryCount, timeout);
243 NetconfMapping.checkValidReply(message, result)
247 override getProviderFunctionality() {
251 override onSessionInitiated(ProviderSession session) {
252 dataBroker = session.getService(DataBrokerService);
254 val transaction = dataBroker.beginTransaction
255 if (transaction.operationalNodeNotExisting) {
256 transaction.putOperationalData(path, nodeWithId)
258 if (transaction.configurationNodeNotExisting) {
259 transaction.putConfigurationData(path, nodeWithId)
261 transaction.commit().get();
262 mountService = session.getService(MountProvisionService);
263 mountInstance = mountService?.createOrGetMountPoint(path);
266 def getNodeWithId() {
267 val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
268 return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
271 def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
272 return null === transaction.readConfigurationData(path);
275 def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
276 return null === transaction.readOperationalData(path);
279 static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
281 var Node<?> current = node;
282 for (arg : identifier.path) {
283 if (current instanceof SimpleNode<?>) {
285 } else if (current instanceof CompositeNode) {
286 val currentComposite = (current as CompositeNode);
288 current = currentComposite.getFirstCompositeByName(arg.nodeType);
289 if(current == null) {
290 current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
292 if(current == null) {
293 current = currentComposite.getFirstSimpleByName(arg.nodeType);
295 if (current == null) {
296 current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
297 } if (current == null) {
305 override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
306 val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
307 twoPhaseCommit.prepare()
308 return twoPhaseCommit;
311 def getInitialCapabilities() {
312 val capabilities = client?.capabilities;
313 if (capabilities == null) {
316 if (cachedCapabilities == null) {
317 cachedCapabilities = FluentIterable.from(capabilities).filter[
318 contains("?") && contains("module=") && contains("revision=")].transform [
319 val parts = split("\\?");
320 val namespace = parts.get(0);
321 val queryParams = FluentIterable.from(parts.get(1).split("&"));
322 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
323 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
324 if (revision === null) {
325 logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
326 revision = queryParams.findFirst[startsWith("&revision=")]?.replaceAll("revision=", "");
327 if (revision != null) {
328 logger.warn("Netconf device returned revision incorectly escaped for {}", it)
331 if (revision == null) {
332 return QName.create(URI.create(namespace), null, moduleName);
334 return QName.create(namespace, revision, moduleName);
337 return cachedCapabilities;
341 confReaderReg?.close()
342 operReaderReg?.close()
347 package class NetconfDeviceSchemaContextProvider {
350 val NetconfDevice device;
353 val SchemaSourceProvider<InputStream> sourceProvider;
356 var Optional<SchemaContext> currentContext;
358 new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
360 _sourceProvider = sourceProvider
361 _currentContext = Optional.absent();
364 def createContextFromCapabilities(Iterable<QName> capabilities) {
365 val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
366 if (!sourceContext.missingSources.empty) {
367 device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
369 device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
370 val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
371 if (!sourceContext.validSources.empty) {
372 val schemaContext = tryToCreateContext(modelsToParse);
373 currentContext = Optional.fromNullable(schemaContext);
375 currentContext = Optional.absent();
377 if (currentContext.present) {
378 device.logger.debug("Schema context successfully created.");
383 def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
384 val parser = new YangParserImpl();
387 val models = parser.parseYangModelsFromStreams(modelsToParse);
388 val result = parser.resolveSchemaContext(models);
390 } catch (Exception e) {
391 device.logger.debug("Error occured during parsing YANG schemas", e);