1 package org.opendaylight.controller.sal.connect.netconf
3 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
4 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.yang.data.api.CompositeNode
7 import org.opendaylight.controller.netconf.client.NetconfClient
8 import org.opendaylight.controller.sal.core.api.RpcImplementation
9 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
10 import java.net.InetSocketAddress
11 import org.opendaylight.yangtools.yang.data.api.Node
12 import org.opendaylight.yangtools.yang.data.api.SimpleNode
13 import org.opendaylight.yangtools.yang.common.QName
14 import java.util.Collections
15 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
16 import org.opendaylight.yangtools.concepts.Registration
17 import org.opendaylight.controller.sal.core.api.Provider
18 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
19 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
20 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*;
21 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
22 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
23 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
24 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
25 import org.opendaylight.protocol.framework.ReconnectStrategy
26 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
27 import org.opendaylight.controller.md.sal.common.api.data.DataModification
28 import com.google.common.collect.FluentIterable
29 import org.opendaylight.yangtools.yang.model.api.SchemaContext
30 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState
32 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
33 import java.io.InputStream
34 import org.slf4j.LoggerFactory
35 import org.slf4j.Logger
36 import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener
37 import org.opendaylight.controller.netconf.client.NetconfClientSession
38 import org.opendaylight.controller.netconf.api.NetconfMessage
39 import io.netty.util.concurrent.EventExecutor
43 import com.google.common.collect.ImmutableMap
45 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
46 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
47 import com.google.common.base.Optional
48 import com.google.common.collect.ImmutableList
49 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
50 import static com.google.common.base.Preconditions.*;
51 import java.util.concurrent.ExecutorService
52 import java.util.concurrent.Future
53 import org.opendaylight.controller.netconf.client.NetconfClientSessionListener
54 import io.netty.util.concurrent.Promise
55 import org.opendaylight.controller.netconf.util.xml.XmlElement
56 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants
57 import java.util.concurrent.ExecutionException
58 import java.util.concurrent.locks.ReentrantLock
60 class NetconfDevice implements Provider, //
61 DataReader<InstanceIdentifier, CompositeNode>, //
62 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
66 var NetconfClient client;
69 var InetSocketAddress socketAddress;
72 var MountProvisionInstance mountInstance;
75 var EventExecutor eventExecutor;
78 var ExecutorService processingExecutor;
81 var InstanceIdentifier path;
84 var ReconnectStrategy reconnectStrategy;
87 var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
89 private NetconfDeviceSchemaContextProvider schemaContextProvider
91 protected val Logger logger
93 Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
94 Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
95 Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
98 MountProvisionService mountService
100 int messegeRetryCount = 5;
102 int messageTimeoutCount = 5 * 1000;
104 Set<QName> cachedCapabilities
107 var NetconfClientDispatcher dispatcher
109 static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
111 public new(String name) {
113 this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
114 this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
115 Collections.singletonMap(INVENTORY_ID, name)).toInstance;
119 checkState(dispatcher != null, "Dispatcher must be set.");
120 checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
121 checkState(eventExecutor != null, "Event executor must be set.");
123 val listener = new NetconfDeviceListener(this,eventExecutor);
124 val task = startClientTask(dispatcher, listener)
125 if(mountInstance != null) {
126 confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
127 operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
129 return processingExecutor.submit(task) as Future<Void>;
131 //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
134 def Optional<SchemaContext> getSchemaContext() {
135 if (schemaContextProvider == null) {
136 return Optional.absent();
138 return schemaContextProvider.currentContext;
141 private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
143 logger.info("Starting Netconf Client on: {}", socketAddress);
144 client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
145 logger.debug("Initial capabilities {}", initialCapabilities);
146 var SchemaSourceProvider<String> delegate;
147 if (initialCapabilities.contains(NetconfMapping.IETF_NETCONF_MONITORING_MODULE)) {
148 delegate = new NetconfDeviceSchemaSourceProvider(this);
150 logger.info("Device does not support IETF Netconf Monitoring.", socketAddress);
151 delegate = SchemaSourceProviders.<String>noopProvider();
153 val sourceProvider = schemaSourceProvider.createInstanceFor(delegate);
154 schemaContextProvider = new NetconfDeviceSchemaContextProvider(this, sourceProvider);
155 schemaContextProvider.createContextFromCapabilities(initialCapabilities);
156 if (mountInstance != null && schemaContext.isPresent) {
157 mountInstance.schemaContext = schemaContext.get();
162 override readConfigurationData(InstanceIdentifier path) {
163 val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
164 wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
165 val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
166 return data?.findNode(path) as CompositeNode;
169 override readOperationalData(InstanceIdentifier path) {
170 val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
171 val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
172 return data?.findNode(path) as CompositeNode;
175 override getSupportedRpcs() {
176 Collections.emptySet;
179 def createSubscription(String streamName) {
180 val it = ImmutableCompositeNode.builder()
181 QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
182 addLeaf("stream",streamName);
183 invokeRpc(QName,toInstance())
186 override invokeRpc(QName rpc, CompositeNode input) {
187 val message = rpc.toRpcMessage(input);
188 val result = client.sendMessage(message, messegeRetryCount, messageTimeoutCount);
189 return result.toRpcResult();
192 override getProviderFunctionality() {
196 override onSessionInitiated(ProviderSession session) {
197 val dataBroker = session.getService(DataBrokerService);
199 val transaction = dataBroker.beginTransaction
200 if (transaction.operationalNodeNotExisting) {
201 transaction.putOperationalData(path, nodeWithId)
203 if (transaction.configurationNodeNotExisting) {
204 transaction.putConfigurationData(path, nodeWithId)
206 transaction.commit().get();
207 mountService = session.getService(MountProvisionService);
208 mountInstance = mountService?.createOrGetMountPoint(path);
211 def getNodeWithId() {
212 val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
213 return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
216 def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
217 return null === transaction.readConfigurationData(path);
220 def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
221 return null === transaction.readOperationalData(path);
224 def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
226 var Node<?> current = node;
227 for (arg : identifier.path) {
228 if (current instanceof SimpleNode<?>) {
230 } else if (current instanceof CompositeNode) {
231 val currentComposite = (current as CompositeNode);
233 current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
234 if (current == null) {
235 current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
237 if (current == null) {
245 override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
246 throw new UnsupportedOperationException("TODO: auto-generated method stub")
249 def getInitialCapabilities() {
250 val capabilities = client?.capabilities;
251 if (capabilities == null) {
254 if (cachedCapabilities == null) {
255 cachedCapabilities = FluentIterable.from(capabilities).filter[
256 contains("?") && contains("module=") && contains("revision=")].transform [
257 val parts = split("\\?");
258 val namespace = parts.get(0);
259 val queryParams = FluentIterable.from(parts.get(1).split("&"));
260 val revision = queryParams.findFirst[startsWith("revision=")].replaceAll("revision=", "");
261 val moduleName = queryParams.findFirst[startsWith("module=")].replaceAll("module=", "");
262 return QName.create(namespace, revision, moduleName);
265 return cachedCapabilities;
269 confReaderReg?.close()
270 operReaderReg?.close()
276 package class NetconfDeviceListener extends NetconfClientSessionListener {
278 val NetconfDevice device
279 val EventExecutor eventExecutor
281 new(NetconfDevice device,EventExecutor eventExecutor) {
283 this.eventExecutor = eventExecutor
286 var Promise<NetconfMessage> messagePromise;
287 val promiseLock = new ReentrantLock;
289 override onMessage(NetconfClientSession session, NetconfMessage message) {
290 if (isNotification(message)) {
291 onNotification(session, message);
294 if (messagePromise != null) {
295 messagePromise.setSuccess(message);
296 messagePromise = null;
304 * Method intended to customize notification processing.
308 * NetconfClientSessionListener#onMessage(NetconfClientSession,
312 * NetconfClientSessionListener#onMessage(NetconfClientSession,
315 def void onNotification(NetconfClientSession session, NetconfMessage message) {
316 device.logger.debug("Received NETCONF notification.",message);
317 val domNotification = message?.toCompositeNode?.notificationBody;
318 if(domNotification != null) {
319 device?.mountInstance?.publish(domNotification);
323 private static def CompositeNode getNotificationBody(CompositeNode node) {
324 for(child : node.children) {
325 if(child instanceof CompositeNode) {
326 return child as CompositeNode;
331 override getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
332 val promise = promiseReply();
333 val messageAvailable = promise.await(attempts + attemptMsDelay);
334 if (messageAvailable) {
336 return promise.get();
337 } catch (ExecutionException e) {
338 throw new IllegalStateException(e);
342 throw new IllegalStateException("Unsuccessful after " + attempts + " attempts.");
344 // throw new TimeoutException("Message was not received on time.");
347 def Promise<NetconfMessage> promiseReply() {
350 if (messagePromise == null) {
351 messagePromise = eventExecutor.newPromise();
352 return messagePromise;
354 return messagePromise;
360 def boolean isNotification(NetconfMessage message) {
361 val xmle = XmlElement.fromDomDocument(message.getDocument());
362 return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName());
366 package class NetconfDeviceSchemaContextProvider {
369 val NetconfDevice device;
372 val SchemaSourceProvider<InputStream> sourceProvider;
375 var Optional<SchemaContext> currentContext;
377 new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
379 _sourceProvider = sourceProvider
382 def createContextFromCapabilities(Iterable<QName> capabilities) {
384 val modelsToParse = ImmutableMap.<QName, InputStream>builder();
385 for (cap : capabilities) {
386 val source = sourceProvider.getSchemaSource(cap.localName, Optional.fromNullable(cap.formattedRevision));
387 if (source.present) {
388 modelsToParse.put(cap, source.get());
391 val context = tryToCreateContext(modelsToParse.build);
392 currentContext = Optional.fromNullable(context);
395 def SchemaContext tryToCreateContext(Map<QName, InputStream> modelsToParse) {
396 val parser = new YangParserImpl();
398 val models = parser.parseYangModelsFromStreams(ImmutableList.copyOf(modelsToParse.values));
399 val result = parser.resolveSchemaContext(models);
401 } catch (Exception e) {
402 device.logger.debug("Error occured during parsing YANG schemas", e);
408 package class NetconfDeviceSchemaSourceProvider implements SchemaSourceProvider<String> {
410 val NetconfDevice device;
412 new(NetconfDevice device) {
413 this.device = device;
416 override getSchemaSource(String moduleName, Optional<String> revision) {
417 val it = ImmutableCompositeNode.builder() //
418 setQName(QName::create(NetconfState.QNAME, "get-schema")) //
419 addLeaf("format", "yang")
420 addLeaf("identifier", moduleName)
421 if (revision.present) {
422 addLeaf("version", revision.get())
425 device.logger.info("Loading YANG schema source for {}:{}", moduleName, revision)
426 val schemaReply = device.invokeRpc(getQName(), toInstance());
428 if (schemaReply.successful) {
429 val schemaBody = schemaReply.result.getFirstSimpleByName(
430 QName::create(NetconfState.QNAME.namespace, null, "data"))?.value;
431 device.logger.info("YANG Schema successfully received for: {}:{}", moduleName, revision);
432 return Optional.of(schemaBody as String);
434 return Optional.absent();