bfe352ad41322cf78404426a5582afc22a4e074d
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
1 package org.opendaylight.controller.sal.connect.netconf
2
3 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
4 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.yang.data.api.CompositeNode
7 import org.opendaylight.controller.netconf.client.NetconfClient
8 import org.opendaylight.controller.sal.core.api.RpcImplementation
9 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
10 import java.net.InetSocketAddress
11 import org.opendaylight.yangtools.yang.data.api.Node
12 import org.opendaylight.yangtools.yang.data.api.SimpleNode
13 import org.opendaylight.yangtools.yang.common.QName
14 import java.util.Collections
15 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
16 import org.opendaylight.yangtools.concepts.Registration
17 import org.opendaylight.controller.sal.core.api.Provider
18 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
19 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
20 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*;
21 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
22 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
23 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
24 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
25 import org.opendaylight.protocol.framework.ReconnectStrategy
26 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
27 import org.opendaylight.controller.md.sal.common.api.data.DataModification
28 import com.google.common.collect.FluentIterable
29 import org.opendaylight.yangtools.yang.model.api.SchemaContext
30 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState
32 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
33 import java.io.InputStream
34 import org.slf4j.LoggerFactory
35 import org.slf4j.Logger
36 import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener
37 import org.opendaylight.controller.netconf.client.NetconfClientSession
38 import org.opendaylight.controller.netconf.api.NetconfMessage
39 import io.netty.util.concurrent.EventExecutor
40
41 import java.util.Map
42 import java.util.Set
43 import com.google.common.collect.ImmutableMap
44
45 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
46 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
47 import com.google.common.base.Optional
48 import com.google.common.collect.ImmutableList
49 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
50 import static com.google.common.base.Preconditions.*;
51 import java.util.concurrent.ExecutorService
52 import java.util.concurrent.Future
53 import org.opendaylight.controller.netconf.client.NetconfClientSessionListener
54 import io.netty.util.concurrent.Promise
55 import org.opendaylight.controller.netconf.util.xml.XmlElement
56 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants
57 import java.util.concurrent.ExecutionException
58 import java.util.concurrent.locks.ReentrantLock
59
60 class NetconfDevice implements Provider, // 
61 DataReader<InstanceIdentifier, CompositeNode>, //
62 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
63 RpcImplementation, //
64 AutoCloseable {
65
66     var NetconfClient client;
67
68     @Property
69     var InetSocketAddress socketAddress;
70
71     @Property
72     var MountProvisionInstance mountInstance;
73
74     @Property
75     var EventExecutor eventExecutor;
76
77     @Property
78     var ExecutorService processingExecutor;
79
80     @Property
81     var InstanceIdentifier path;
82
83     @Property
84     var ReconnectStrategy reconnectStrategy;
85
86     @Property
87     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
88
89     private NetconfDeviceSchemaContextProvider schemaContextProvider
90
91     protected val Logger logger
92
93     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
94     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
95     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
96
97     val String name
98     MountProvisionService mountService
99
100     int messegeRetryCount = 5;
101
102     int messageTimeoutCount = 5 * 1000;
103
104     Set<QName> cachedCapabilities
105
106     @Property
107     var NetconfClientDispatcher dispatcher
108     
109     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
110
111     public new(String name) {
112         this.name = name;
113         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
114         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
115             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
116     }
117
118     def start() {
119         checkState(dispatcher != null, "Dispatcher must be set.");
120         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
121         checkState(eventExecutor != null, "Event executor must be set.");
122
123         val listener = new NetconfDeviceListener(this,eventExecutor);
124         val task = startClientTask(dispatcher, listener)
125         if(mountInstance != null) {
126             confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
127             operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
128         }
129         return processingExecutor.submit(task) as Future<Void>;
130
131     //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
132     }
133
134     def Optional<SchemaContext> getSchemaContext() {
135         if (schemaContextProvider == null) {
136             return Optional.absent();
137         }
138         return schemaContextProvider.currentContext;
139     }
140
141     private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
142         return [ |
143             logger.info("Starting Netconf Client on: {}", socketAddress);
144             client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
145             logger.debug("Initial capabilities {}", initialCapabilities);
146             var SchemaSourceProvider<String> delegate;
147             if (initialCapabilities.contains(NetconfMapping.IETF_NETCONF_MONITORING_MODULE)) {
148                 delegate = new NetconfDeviceSchemaSourceProvider(this);
149             } else {
150                 logger.info("Device does not support IETF Netconf Monitoring.", socketAddress);
151                 delegate = SchemaSourceProviders.<String>noopProvider();
152             }
153             val sourceProvider = schemaSourceProvider.createInstanceFor(delegate);
154             schemaContextProvider = new NetconfDeviceSchemaContextProvider(this, sourceProvider);
155             schemaContextProvider.createContextFromCapabilities(initialCapabilities);
156             if (mountInstance != null && schemaContext.isPresent) {
157                 mountInstance.schemaContext = schemaContext.get();
158             }
159         ]
160     }
161
162     override readConfigurationData(InstanceIdentifier path) {
163         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
164             wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
165         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
166         return data?.findNode(path) as CompositeNode;
167     }
168
169     override readOperationalData(InstanceIdentifier path) {
170         val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
171         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
172         return data?.findNode(path) as CompositeNode;
173     }
174
175     override getSupportedRpcs() {
176         Collections.emptySet;
177     }
178     
179     def createSubscription(String streamName) {
180         val it = ImmutableCompositeNode.builder()
181         QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
182         addLeaf("stream",streamName);
183         invokeRpc(QName,toInstance())
184     }
185
186     override invokeRpc(QName rpc, CompositeNode input) {
187         val message = rpc.toRpcMessage(input);
188         val result = client.sendMessage(message, messegeRetryCount, messageTimeoutCount);
189         return result.toRpcResult();
190     }
191
192     override getProviderFunctionality() {
193         Collections.emptySet
194     }
195
196     override onSessionInitiated(ProviderSession session) {
197         val dataBroker = session.getService(DataBrokerService);
198
199         val transaction = dataBroker.beginTransaction
200         if (transaction.operationalNodeNotExisting) {
201             transaction.putOperationalData(path, nodeWithId)
202         }
203         if (transaction.configurationNodeNotExisting) {
204             transaction.putConfigurationData(path, nodeWithId)
205         }
206         transaction.commit().get();
207         mountService = session.getService(MountProvisionService);
208         mountInstance = mountService?.createOrGetMountPoint(path);
209     }
210
211     def getNodeWithId() {
212         val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
213         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
214     }
215
216     def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
217         return null === transaction.readConfigurationData(path);
218     }
219
220     def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
221         return null === transaction.readOperationalData(path);
222     }
223
224     def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
225
226         var Node<?> current = node;
227         for (arg : identifier.path) {
228             if (current instanceof SimpleNode<?>) {
229                 return null;
230             } else if (current instanceof CompositeNode) {
231                 val currentComposite = (current as CompositeNode);
232
233                 current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
234                 if (current == null) {
235                     current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
236                 }
237                 if (current == null) {
238                     return null;
239                 }
240             }
241         }
242         return current;
243     }
244
245     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
246         throw new UnsupportedOperationException("TODO: auto-generated method stub")
247     }
248
249     def getInitialCapabilities() {
250         val capabilities = client?.capabilities;
251         if (capabilities == null) {
252             return null;
253         }
254         if (cachedCapabilities == null) {
255             cachedCapabilities = FluentIterable.from(capabilities).filter[
256                 contains("?") && contains("module=") && contains("revision=")].transform [
257                 val parts = split("\\?");
258                 val namespace = parts.get(0);
259                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
260                 val revision = queryParams.findFirst[startsWith("revision=")].replaceAll("revision=", "");
261                 val moduleName = queryParams.findFirst[startsWith("module=")].replaceAll("module=", "");
262                 return QName.create(namespace, revision, moduleName);
263             ].toSet();
264         }
265         return cachedCapabilities;
266     }
267
268     override close() {
269         confReaderReg?.close()
270         operReaderReg?.close()
271         client?.close()
272     }
273
274 }
275
276 package class NetconfDeviceListener extends NetconfClientSessionListener {
277
278     val NetconfDevice device
279     val EventExecutor eventExecutor
280
281     new(NetconfDevice device,EventExecutor eventExecutor) {
282         this.device = device
283         this.eventExecutor = eventExecutor
284     }
285
286     var Promise<NetconfMessage> messagePromise;
287     val promiseLock = new ReentrantLock;
288     
289     override onMessage(NetconfClientSession session, NetconfMessage message) {
290         if (isNotification(message)) {
291             onNotification(session, message);
292         } else try {
293             promiseLock.lock
294             if (messagePromise != null) {
295                 messagePromise.setSuccess(message);
296                 messagePromise = null;
297             }
298         } finally {
299             promiseLock.unlock
300         }
301     }
302
303     /**
304      * Method intended to customize notification processing.
305      * 
306      * @param session
307      *            {@see
308      *            NetconfClientSessionListener#onMessage(NetconfClientSession,
309      *            NetconfMessage)}
310      * @param message
311      *            {@see
312      *            NetconfClientSessionListener#onMessage(NetconfClientSession,
313      *            NetconfMessage)}
314      */
315     def void onNotification(NetconfClientSession session, NetconfMessage message) {
316         device.logger.debug("Received NETCONF notification.",message);
317         val domNotification = message?.toCompositeNode?.notificationBody;
318         if(domNotification != null) {
319             device?.mountInstance?.publish(domNotification);
320         }
321     }
322     
323     private static def CompositeNode getNotificationBody(CompositeNode node) {
324         for(child : node.children) {
325             if(child instanceof CompositeNode) {
326                 return child as CompositeNode;
327             }
328         }
329     }
330
331     override getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
332         val promise = promiseReply();
333         val messageAvailable = promise.await(attempts + attemptMsDelay);
334         if (messageAvailable) {
335             try {
336                 return promise.get();
337             } catch (ExecutionException e) {
338                 throw new IllegalStateException(e);
339             }
340         }
341
342         throw new IllegalStateException("Unsuccessful after " + attempts + " attempts.");
343
344     // throw new TimeoutException("Message was not received on time.");
345     }
346
347     def Promise<NetconfMessage> promiseReply() {
348         promiseLock.lock
349         try {
350         if (messagePromise == null) {
351             messagePromise = eventExecutor.newPromise();
352             return messagePromise;
353         }
354         return messagePromise;
355         } finally {
356             promiseLock.unlock
357         }
358     }
359
360     def boolean isNotification(NetconfMessage message) {
361         val xmle = XmlElement.fromDomDocument(message.getDocument());
362         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName());
363     }
364 }
365
366 package class NetconfDeviceSchemaContextProvider {
367
368     @Property
369     val NetconfDevice device;
370
371     @Property
372     val SchemaSourceProvider<InputStream> sourceProvider;
373
374     @Property
375     var Optional<SchemaContext> currentContext;
376
377     new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
378         _device = device
379         _sourceProvider = sourceProvider
380     }
381
382     def createContextFromCapabilities(Iterable<QName> capabilities) {
383
384         val modelsToParse = ImmutableMap.<QName, InputStream>builder();
385         for (cap : capabilities) {
386             val source = sourceProvider.getSchemaSource(cap.localName, Optional.fromNullable(cap.formattedRevision));
387             if (source.present) {
388                 modelsToParse.put(cap, source.get());
389             }
390         }
391         val context = tryToCreateContext(modelsToParse.build);
392         currentContext = Optional.fromNullable(context);
393     }
394
395     def SchemaContext tryToCreateContext(Map<QName, InputStream> modelsToParse) {
396         val parser = new YangParserImpl();
397         try {
398             val models = parser.parseYangModelsFromStreams(ImmutableList.copyOf(modelsToParse.values));
399             val result = parser.resolveSchemaContext(models);
400             return result;
401         } catch (Exception e) {
402             device.logger.debug("Error occured during parsing YANG schemas", e);
403             return null;
404         }
405     }
406 }
407
408 package class NetconfDeviceSchemaSourceProvider implements SchemaSourceProvider<String> {
409
410     val NetconfDevice device;
411
412     new(NetconfDevice device) {
413         this.device = device;
414     }
415
416     override getSchemaSource(String moduleName, Optional<String> revision) {
417         val it = ImmutableCompositeNode.builder() //
418         setQName(QName::create(NetconfState.QNAME, "get-schema")) //
419         addLeaf("format", "yang")
420         addLeaf("identifier", moduleName)
421         if (revision.present) {
422             addLeaf("version", revision.get())
423         }
424
425         device.logger.info("Loading YANG schema source for {}:{}", moduleName, revision)
426         val schemaReply = device.invokeRpc(getQName(), toInstance());
427
428         if (schemaReply.successful) {
429             val schemaBody = schemaReply.result.getFirstSimpleByName(
430                 QName::create(NetconfState.QNAME.namespace, null, "data"))?.value;
431             device.logger.info("YANG Schema successfully received for: {}:{}", moduleName, revision);
432             return Optional.of(schemaBody as String);
433         }
434         return Optional.absent();
435     }
436 }