Merge "Switch to using yangtools version of mockito-configuration"
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
1 package org.opendaylight.controller.sal.connect.netconf
2
3 import com.google.common.base.Optional
4 import com.google.common.collect.FluentIterable
5 import io.netty.util.concurrent.EventExecutor
6 import java.io.InputStream
7 import java.net.InetSocketAddress
8 import java.net.URI
9 import java.util.Collections
10 import java.util.List
11 import java.util.Set
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Future
14 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
15 import org.opendaylight.controller.md.sal.common.api.data.DataModification
16 import org.opendaylight.controller.md.sal.common.api.data.DataReader
17 import org.opendaylight.controller.netconf.api.NetconfMessage
18 import org.opendaylight.controller.netconf.client.NetconfClient
19 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
20 import org.opendaylight.controller.netconf.util.xml.XmlUtil
21 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
22 import org.opendaylight.controller.sal.core.api.Provider
23 import org.opendaylight.controller.sal.core.api.RpcImplementation
24 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
25 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
26 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
27 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
28 import org.opendaylight.protocol.framework.ReconnectStrategy
29 import org.opendaylight.yangtools.concepts.Registration
30 import org.opendaylight.yangtools.yang.common.QName
31 import org.opendaylight.yangtools.yang.data.api.CompositeNode
32 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
33 import org.opendaylight.yangtools.yang.data.api.Node
34 import org.opendaylight.yangtools.yang.data.api.SimpleNode
35 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
36 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
37 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext
39 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
40 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
41 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
42 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
43 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
44 import org.slf4j.Logger
45 import org.slf4j.LoggerFactory
46
47 import static com.google.common.base.Preconditions.*
48 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
49
50 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
51
52 class NetconfDevice implements Provider, // 
53 DataReader<InstanceIdentifier, CompositeNode>, //
54 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
55 RpcImplementation, //
56 AutoCloseable {
57
58     var NetconfClient client;
59
60     @Property
61     var InetSocketAddress socketAddress;
62
63     @Property
64     var MountProvisionInstance mountInstance;
65
66     @Property
67     var EventExecutor eventExecutor;
68
69     @Property
70     var ExecutorService processingExecutor;
71
72     @Property
73     var InstanceIdentifier path;
74
75     @Property
76     var ReconnectStrategy reconnectStrategy;
77
78     @Property
79     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
80
81     @Property
82     private NetconfDeviceSchemaContextProvider deviceContextProvider
83
84     protected val Logger logger
85
86     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
87     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
88     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
89
90     val String name
91     MountProvisionService mountService
92
93     int messegeRetryCount = 5;
94
95     int messageTimeoutCount = 5 * 1000;
96
97     Set<QName> cachedCapabilities
98
99     @Property
100     var NetconfClientDispatcher dispatcher
101
102     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
103
104     @Property
105     var SchemaSourceProvider<InputStream> remoteSourceProvider
106     
107     DataBrokerService dataBroker
108
109     public new(String name) {
110         this.name = name;
111         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
112         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
113             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
114     }
115
116     def start() {
117         checkState(dispatcher != null, "Dispatcher must be set.");
118         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
119         checkState(eventExecutor != null, "Event executor must be set.");
120
121         val listener = new NetconfDeviceListener(this, eventExecutor);
122         val task = startClientTask(dispatcher, listener)
123         if (mountInstance != null) {
124             commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
125         }
126         return processingExecutor.submit(task) as Future<Void>;
127
128     //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
129     }
130
131     def Optional<SchemaContext> getSchemaContext() {
132         if (deviceContextProvider == null) {
133             return Optional.absent();
134         }
135         return deviceContextProvider.currentContext;
136     }
137
138     private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
139         return [ |
140             try {
141                 logger.info("Starting Netconf Client on: {}", socketAddress);
142                 client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
143                 logger.debug("Initial capabilities {}", initialCapabilities);
144                 var SchemaSourceProvider<String> delegate;
145                 if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
146                     delegate = new NetconfRemoteSchemaSourceProvider(this);
147                 }  else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
148                     delegate = new NetconfRemoteSchemaSourceProvider(this);
149                 } else {
150                     logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
151                     delegate = SchemaSourceProviders.<String>noopProvider();
152                 }
153                 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
154                 deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
155                 deviceContextProvider.createContextFromCapabilities(initialCapabilities);
156                 if (mountInstance != null && schemaContext.isPresent) {
157                     mountInstance.schemaContext = schemaContext.get();
158                 }
159                 updateDeviceState()
160                 if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
161                     confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
162                     operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
163                 }
164             } catch (Exception e) {
165                 logger.error("Netconf client NOT started. ", e)
166             }
167         ]
168     }
169
170     private def updateDeviceState() {
171         val transaction = dataBroker.beginTransaction
172
173         val it = ImmutableCompositeNode.builder
174         setQName(INVENTORY_NODE)
175         addLeaf(INVENTORY_ID, name)
176         addLeaf(INVENTORY_CONNECTED, client.clientSession.up)
177
178         logger.debug("Client capabilities {}", client.capabilities)
179         for (capability : client.capabilities) {
180             addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
181         }
182
183         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
184         transaction.putOperationalData(path, it.toInstance)
185         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
186         val transactionStatus = transaction.commit.get;
187
188         if (transactionStatus.successful) {
189             logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
190         } else {
191             logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
192             logger.debug("Update device state transaction status " + transaction.status)
193         }
194     }
195
196     override readConfigurationData(InstanceIdentifier path) {
197         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
198             wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
199         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
200         return data?.findNode(path) as CompositeNode;
201     }
202
203     override readOperationalData(InstanceIdentifier path) {
204         val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
205         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
206         return data?.findNode(path) as CompositeNode;
207     }
208
209     override getSupportedRpcs() {
210         Collections.emptySet;
211     }
212
213     def createSubscription(String streamName) {
214         val it = ImmutableCompositeNode.builder()
215         QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
216         addLeaf("stream", streamName);
217         invokeRpc(QName, toInstance())
218     }
219
220     override invokeRpc(QName rpc, CompositeNode input) {
221         try {
222             val message = rpc.toRpcMessage(input,schemaContext);
223             val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
224             return result.toRpcResult(rpc, schemaContext);
225
226         } catch (Exception e) {
227             logger.error("Rpc was not processed correctly.", e)
228             throw e;
229         }
230     }
231
232     def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
233         logger.debug("Send message {}",XmlUtil.toString(message.document))
234         val result = client.sendMessage(message, retryCount, timeout);
235         NetconfMapping.checkValidReply(message, result)
236         return result;
237     }
238
239     override getProviderFunctionality() {
240         Collections.emptySet
241     }
242
243     override onSessionInitiated(ProviderSession session) {
244         dataBroker = session.getService(DataBrokerService);
245
246         val transaction = dataBroker.beginTransaction
247         if (transaction.operationalNodeNotExisting) {
248             transaction.putOperationalData(path, nodeWithId)
249         }
250         if (transaction.configurationNodeNotExisting) {
251             transaction.putConfigurationData(path, nodeWithId)
252         }
253         transaction.commit().get();
254         mountService = session.getService(MountProvisionService);
255         mountInstance = mountService?.createOrGetMountPoint(path);
256     }
257
258     def getNodeWithId() {
259         val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
260         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
261     }
262
263     def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
264         return null === transaction.readConfigurationData(path);
265     }
266
267     def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
268         return null === transaction.readOperationalData(path);
269     }
270
271     static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
272
273         var Node<?> current = node;
274         for (arg : identifier.path) {
275             if (current instanceof SimpleNode<?>) {
276                 return null;
277             } else if (current instanceof CompositeNode) {
278                 val currentComposite = (current as CompositeNode);
279                 
280                 current = currentComposite.getFirstCompositeByName(arg.nodeType);
281                 if(current == null) {
282                     current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
283                 }
284                 if(current == null) {
285                     current = currentComposite.getFirstSimpleByName(arg.nodeType);
286                 }
287                 if (current == null) {
288                     current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
289                 } if (current == null) {
290                     return null;
291                 }
292             }
293         }
294         return current;
295     }
296
297     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
298         val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
299         twoPhaseCommit.prepare()
300         return twoPhaseCommit;
301     }
302
303     def getInitialCapabilities() {
304         val capabilities = client?.capabilities;
305         if (capabilities == null) {
306             return null;
307         }
308         if (cachedCapabilities == null) {
309             cachedCapabilities = FluentIterable.from(capabilities).filter[
310                 contains("?") && contains("module=") && contains("revision=")].transform [
311                 val parts = split("\\?");
312                 val namespace = parts.get(0);
313                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
314                 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
315                 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
316                 if (revision === null) {
317                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
318                     revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
319                     if (revision != null) {
320                         logger.warn("Netconf device returned revision incorectly escaped for {}", it)
321                     }
322                 }
323                 if (revision == null) {
324                     return QName.create(URI.create(namespace), null, moduleName);
325                 }
326                 return QName.create(namespace, revision, moduleName);
327             ].toSet();
328         }
329         return cachedCapabilities;
330     }
331
332     override close() {
333         confReaderReg?.close()
334         operReaderReg?.close()
335         client?.close()
336     }
337
338 }
339
340 package class NetconfDeviceSchemaContextProvider {
341
342     @Property
343     val NetconfDevice device;
344
345     @Property
346     val SchemaSourceProvider<InputStream> sourceProvider;
347
348     @Property
349     var Optional<SchemaContext> currentContext;
350
351     new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
352         _device = device
353         _sourceProvider = sourceProvider
354         _currentContext = Optional.absent();
355     }
356
357     def createContextFromCapabilities(Iterable<QName> capabilities) {
358         val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
359         if (!sourceContext.missingSources.empty) {
360             device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
361         }
362         device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
363         val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
364         if (!sourceContext.validSources.empty) {
365             val schemaContext = tryToCreateContext(modelsToParse);
366             currentContext = Optional.fromNullable(schemaContext);
367         } else {
368             currentContext = Optional.absent();
369         }
370         if (currentContext.present) {
371             device.logger.debug("Schema context successfully created.");
372         }
373
374     }
375
376     def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
377         val parser = new YangParserImpl();
378         try {
379
380             val models = parser.parseYangModelsFromStreams(modelsToParse);
381             val result = parser.resolveSchemaContext(models);
382             return result;
383         } catch (Exception e) {
384             device.logger.debug("Error occured during parsing YANG schemas", e);
385             return null;
386         }
387     }
388 }