1 package org.opendaylight.controller.sal.connect.netconf
3 import com.google.common.base.Optional
4 import com.google.common.collect.FluentIterable
5 import io.netty.util.concurrent.EventExecutor
6 import java.io.InputStream
7 import java.net.InetSocketAddress
9 import java.util.Collections
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Future
14 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
15 import org.opendaylight.controller.md.sal.common.api.data.DataModification
16 import org.opendaylight.controller.md.sal.common.api.data.DataReader
17 import org.opendaylight.controller.netconf.api.NetconfMessage
18 import org.opendaylight.controller.netconf.client.NetconfClient
19 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
20 import org.opendaylight.controller.netconf.util.xml.XmlUtil
21 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
22 import org.opendaylight.controller.sal.core.api.Provider
23 import org.opendaylight.controller.sal.core.api.RpcImplementation
24 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
25 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
26 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
27 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
28 import org.opendaylight.protocol.framework.ReconnectStrategy
29 import org.opendaylight.yangtools.concepts.Registration
30 import org.opendaylight.yangtools.yang.common.QName
31 import org.opendaylight.yangtools.yang.data.api.CompositeNode
32 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
33 import org.opendaylight.yangtools.yang.data.api.Node
34 import org.opendaylight.yangtools.yang.data.api.SimpleNode
35 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
36 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
37 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext
39 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
40 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
41 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
42 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
43 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
44 import org.slf4j.Logger
45 import org.slf4j.LoggerFactory
47 import static com.google.common.base.Preconditions.*
48 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
50 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
52 class NetconfDevice implements Provider, //
53 DataReader<InstanceIdentifier, CompositeNode>, //
54 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
58 var NetconfClient client;
61 var InetSocketAddress socketAddress;
64 var MountProvisionInstance mountInstance;
67 var EventExecutor eventExecutor;
70 var ExecutorService processingExecutor;
73 var InstanceIdentifier path;
76 var ReconnectStrategy reconnectStrategy;
79 var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
82 private NetconfDeviceSchemaContextProvider deviceContextProvider
84 protected val Logger logger
86 Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
87 Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
88 Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
91 MountProvisionService mountService
93 int messegeRetryCount = 5;
95 int messageTimeoutCount = 5 * 1000;
97 Set<QName> cachedCapabilities
100 var NetconfClientDispatcher dispatcher
102 static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
105 var SchemaSourceProvider<InputStream> remoteSourceProvider
107 DataBrokerService dataBroker
109 public new(String name) {
111 this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
112 this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
113 Collections.singletonMap(INVENTORY_ID, name)).toInstance;
117 checkState(dispatcher != null, "Dispatcher must be set.");
118 checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
119 checkState(eventExecutor != null, "Event executor must be set.");
121 val listener = new NetconfDeviceListener(this, eventExecutor);
122 val task = startClientTask(dispatcher, listener)
123 if (mountInstance != null) {
124 commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
126 return processingExecutor.submit(task) as Future<Void>;
128 //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
131 def Optional<SchemaContext> getSchemaContext() {
132 if (deviceContextProvider == null) {
133 return Optional.absent();
135 return deviceContextProvider.currentContext;
138 private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
141 logger.info("Starting Netconf Client on: {}", socketAddress);
142 client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
143 logger.debug("Initial capabilities {}", initialCapabilities);
144 var SchemaSourceProvider<String> delegate;
145 if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
146 delegate = new NetconfRemoteSchemaSourceProvider(this);
147 } else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
148 delegate = new NetconfRemoteSchemaSourceProvider(this);
150 logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
151 delegate = SchemaSourceProviders.<String>noopProvider();
153 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
154 deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
155 deviceContextProvider.createContextFromCapabilities(initialCapabilities);
156 if (mountInstance != null && schemaContext.isPresent) {
157 mountInstance.schemaContext = schemaContext.get();
160 if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
161 confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
162 operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
164 } catch (Exception e) {
165 logger.error("Netconf client NOT started. ", e)
170 private def updateDeviceState() {
171 val transaction = dataBroker.beginTransaction
173 val it = ImmutableCompositeNode.builder
174 setQName(INVENTORY_NODE)
175 addLeaf(INVENTORY_ID, name)
176 addLeaf(INVENTORY_CONNECTED, client.clientSession.up)
178 logger.debug("Client capabilities {}", client.capabilities)
179 for (capability : client.capabilities) {
180 addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
183 logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
184 transaction.putOperationalData(path, it.toInstance)
185 logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
186 val transactionStatus = transaction.commit.get;
188 if (transactionStatus.successful) {
189 logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
191 logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
192 logger.debug("Update device state transaction status " + transaction.status)
196 override readConfigurationData(InstanceIdentifier path) {
197 val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
198 wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
199 val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
200 return data?.findNode(path) as CompositeNode;
203 override readOperationalData(InstanceIdentifier path) {
204 val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
205 val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
206 return data?.findNode(path) as CompositeNode;
209 override getSupportedRpcs() {
210 Collections.emptySet;
213 def createSubscription(String streamName) {
214 val it = ImmutableCompositeNode.builder()
215 QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
216 addLeaf("stream", streamName);
217 invokeRpc(QName, toInstance())
220 override invokeRpc(QName rpc, CompositeNode input) {
222 val message = rpc.toRpcMessage(input,schemaContext);
223 val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
224 return result.toRpcResult(rpc, schemaContext);
226 } catch (Exception e) {
227 logger.error("Rpc was not processed correctly.", e)
232 def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
233 logger.debug("Send message {}",XmlUtil.toString(message.document))
234 val result = client.sendMessage(message, retryCount, timeout);
235 NetconfMapping.checkValidReply(message, result)
239 override getProviderFunctionality() {
243 override onSessionInitiated(ProviderSession session) {
244 dataBroker = session.getService(DataBrokerService);
246 val transaction = dataBroker.beginTransaction
247 if (transaction.operationalNodeNotExisting) {
248 transaction.putOperationalData(path, nodeWithId)
250 if (transaction.configurationNodeNotExisting) {
251 transaction.putConfigurationData(path, nodeWithId)
253 transaction.commit().get();
254 mountService = session.getService(MountProvisionService);
255 mountInstance = mountService?.createOrGetMountPoint(path);
258 def getNodeWithId() {
259 val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
260 return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
263 def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
264 return null === transaction.readConfigurationData(path);
267 def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
268 return null === transaction.readOperationalData(path);
271 static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
273 var Node<?> current = node;
274 for (arg : identifier.path) {
275 if (current instanceof SimpleNode<?>) {
277 } else if (current instanceof CompositeNode) {
278 val currentComposite = (current as CompositeNode);
280 current = currentComposite.getFirstCompositeByName(arg.nodeType);
281 if(current == null) {
282 current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
284 if(current == null) {
285 current = currentComposite.getFirstSimpleByName(arg.nodeType);
287 if (current == null) {
288 current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
289 } if (current == null) {
297 override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
298 val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
299 twoPhaseCommit.prepare()
300 return twoPhaseCommit;
303 def getInitialCapabilities() {
304 val capabilities = client?.capabilities;
305 if (capabilities == null) {
308 if (cachedCapabilities == null) {
309 cachedCapabilities = FluentIterable.from(capabilities).filter[
310 contains("?") && contains("module=") && contains("revision=")].transform [
311 val parts = split("\\?");
312 val namespace = parts.get(0);
313 val queryParams = FluentIterable.from(parts.get(1).split("&"));
314 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
315 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
316 if (revision === null) {
317 logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
318 revision = queryParams.findFirst[startsWith("&revision=")]?.replaceAll("revision=", "");
319 if (revision != null) {
320 logger.warn("Netconf device returned revision incorectly escaped for {}", it)
323 if (revision == null) {
324 return QName.create(URI.create(namespace), null, moduleName);
326 return QName.create(namespace, revision, moduleName);
329 return cachedCapabilities;
333 confReaderReg?.close()
334 operReaderReg?.close()
340 package class NetconfDeviceSchemaContextProvider {
343 val NetconfDevice device;
346 val SchemaSourceProvider<InputStream> sourceProvider;
349 var Optional<SchemaContext> currentContext;
351 new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
353 _sourceProvider = sourceProvider
354 _currentContext = Optional.absent();
357 def createContextFromCapabilities(Iterable<QName> capabilities) {
358 val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
359 if (!sourceContext.missingSources.empty) {
360 device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
362 device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
363 val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
364 if (!sourceContext.validSources.empty) {
365 val schemaContext = tryToCreateContext(modelsToParse);
366 currentContext = Optional.fromNullable(schemaContext);
368 currentContext = Optional.absent();
370 if (currentContext.present) {
371 device.logger.debug("Schema context successfully created.");
376 def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
377 val parser = new YangParserImpl();
380 val models = parser.parseYangModelsFromStreams(modelsToParse);
381 val result = parser.resolveSchemaContext(models);
383 } catch (Exception e) {
384 device.logger.debug("Error occured during parsing YANG schemas", e);