Bug 849: Fixed NPE in Translated Data Change Events.
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
12 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
13 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
14 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
15 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
16 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
17 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
18 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
19 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
20 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
21 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
22 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
23
24 import com.google.common.base.Preconditions;
25 import java.io.InputStream;
26 import java.net.InetSocketAddress;
27 import java.net.URI;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.ExecutorService;
36
37 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
38 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
39 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
40 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
41 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
42 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
43 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
44 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
45 import org.opendaylight.controller.sal.core.api.Provider;
46 import org.opendaylight.controller.sal.core.api.RpcImplementation;
47 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
48 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
49 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
50 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
51 import org.opendaylight.protocol.framework.ReconnectStrategy;
52 import org.opendaylight.yangtools.concepts.Registration;
53 import org.opendaylight.yangtools.yang.common.QName;
54 import org.opendaylight.yangtools.yang.common.RpcResult;
55 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
56 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.Node;
58 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
59 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
60 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
61 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
62 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
63 import org.opendaylight.yangtools.yang.model.api.Module;
64 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
65 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
66 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
67 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
68 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
69 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 import com.google.common.base.Function;
74 import com.google.common.base.Optional;
75 import com.google.common.base.Predicate;
76 import com.google.common.collect.FluentIterable;
77 import com.google.common.collect.Iterables;
78 import com.google.common.util.concurrent.ListenableFuture;
79 import io.netty.util.concurrent.EventExecutor;
80
81 public class NetconfDevice implements Provider, //
82         DataReader<InstanceIdentifier, CompositeNode>, //
83         DataCommitHandler<InstanceIdentifier, CompositeNode>, //
84         RpcImplementation, //
85         AutoCloseable {
86
87     InetSocketAddress socketAddress;
88
89     MountProvisionInstance mountInstance;
90
91     EventExecutor eventExecutor;
92
93     ExecutorService processingExecutor;
94
95     InstanceIdentifier path;
96
97     ReconnectStrategy reconnectStrategy;
98
99     AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
100
101     private NetconfDeviceSchemaContextProvider deviceContextProvider;
102
103     protected Logger logger;
104
105     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
106     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
107     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
108     List<RpcRegistration> rpcReg;
109
110     String name;
111
112     MountProvisionService mountService;
113
114     NetconfClientDispatcher dispatcher;
115
116     static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
117
118     SchemaSourceProvider<InputStream> remoteSourceProvider;
119
120     DataBrokerService dataBroker;
121
122     NetconfDeviceListener listener;
123
124     private boolean rollbackSupported;
125
126     private NetconfClientConfiguration clientConfig;
127
128     public NetconfDevice(String name) {
129         this.name = name;
130         this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
131         this.path = InstanceIdentifier.builder(INVENTORY_PATH)
132                 .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
133     }
134
135     public void start() {
136         checkState(dispatcher != null, "Dispatcher must be set.");
137         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
138         checkState(eventExecutor != null, "Event executor must be set.");
139
140         Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener);
141         listener = (NetconfDeviceListener) clientConfig.getSessionListener();
142
143         logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
144
145         dispatcher.createClient(clientConfig);
146     }
147
148     Optional<SchemaContext> getSchemaContext() {
149         if (deviceContextProvider == null) {
150             return Optional.absent();
151         }
152         return deviceContextProvider.currentContext;
153     }
154
155     void bringDown() {
156         if (rpcReg != null) {
157             for (RpcRegistration reg : rpcReg) {
158                 reg.close();
159             }
160             rpcReg = null;
161         }
162         closeGracefully(confReaderReg);
163         confReaderReg = null;
164         closeGracefully(operReaderReg);
165         operReaderReg = null;
166         closeGracefully(commitHandlerReg);
167         commitHandlerReg = null;
168
169         updateDeviceState(false, Collections.<QName> emptySet());
170     }
171
172     private void closeGracefully(final AutoCloseable resource) {
173         if (resource != null) {
174             try {
175                 resource.close();
176             } catch (Exception e) {
177                 logger.warn("Ignoring exception while closing {}", resource, e);
178             }
179         }
180     }
181
182     void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
183         // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
184         // Reason: delegate.getSchema blocks thread when waiting for response
185         // however, if the netty thread is blocked, no incoming message can be processed
186         // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
187         // TODO redesign +refactor
188         processingExecutor.submit(new Runnable() {
189             @Override
190             public void run() {
191                 NetconfDevice.this.rollbackSupported = rollbackSupported;
192                 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
193                 deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
194                 deviceContextProvider.createContextFromCapabilities(capabilities);
195                 if (mountInstance != null && getSchemaContext().isPresent()) {
196                     mountInstance.setSchemaContext(getSchemaContext().get());
197                 }
198
199                 updateDeviceState(true, capabilities);
200
201                 if (mountInstance != null) {
202                     confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
203                     operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
204                     commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
205
206                     List<RpcRegistration> rpcs = new ArrayList<>();
207                     // TODO same condition twice
208                     if (mountInstance != null && getSchemaContext().isPresent()) {
209                         for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
210                             rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
211                         }
212                     }
213                     rpcReg = rpcs;
214                 }
215             }
216         });
217     }
218
219     private void updateDeviceState(boolean up, Set<QName> capabilities) {
220         DataModificationTransaction transaction = dataBroker.beginTransaction();
221
222         CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
223         it.setQName(INVENTORY_NODE);
224         it.addLeaf(INVENTORY_ID, name);
225         it.addLeaf(INVENTORY_CONNECTED, up);
226
227         logger.debug("Client capabilities {}", capabilities);
228         for (QName capability : capabilities) {
229             it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString());
230         }
231
232         logger.debug("Update device state transaction " + transaction.getIdentifier()
233                 + " putting operational data started.");
234         transaction.removeOperationalData(path);
235         transaction.putOperationalData(path, it.toInstance());
236         logger.debug("Update device state transaction " + transaction.getIdentifier()
237                 + " putting operational data ended.");
238
239         // FIXME: this has to be asynchronous
240         RpcResult<TransactionStatus> transactionStatus = null;
241         try {
242             transactionStatus = transaction.commit().get();
243         } catch (InterruptedException e) {
244             throw new RuntimeException("Interrupted while waiting for response", e);
245         } catch (ExecutionException e) {
246             throw new RuntimeException("Read configuration data " + path + " failed", e);
247         }
248         // TODO better ex handling
249
250         if (transactionStatus.isSuccessful()) {
251             logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
252         } else {
253             logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
254             logger.debug("Update device state transaction status " + transaction.getStatus());
255         }
256     }
257
258     @Override
259     public CompositeNode readConfigurationData(InstanceIdentifier path) {
260         RpcResult<CompositeNode> result = null;
261         try {
262             result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
263                     wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
264         } catch (InterruptedException e) {
265             throw new RuntimeException("Interrupted while waiting for response", e);
266         } catch (ExecutionException e) {
267             throw new RuntimeException("Read configuration data " + path + " failed", e);
268         }
269
270         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
271         return data == null ? null : (CompositeNode) findNode(data, path);
272     }
273
274     @Override
275     public CompositeNode readOperationalData(InstanceIdentifier path) {
276         RpcResult<CompositeNode> result = null;
277         try {
278             result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
279         } catch (InterruptedException e) {
280             throw new RuntimeException("Interrupted while waiting for response", e);
281         } catch (ExecutionException e) {
282             throw new RuntimeException("Read configuration data " + path + " failed", e);
283         }
284
285         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
286         return (CompositeNode) findNode(data, path);
287     }
288
289     @Override
290     public Set<QName> getSupportedRpcs() {
291         return Collections.emptySet();
292     }
293
294     @Override
295     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
296         return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
297     }
298
299     @Override
300     public Collection<ProviderFunctionality> getProviderFunctionality() {
301         return Collections.emptySet();
302     }
303
304     @Override
305     public void onSessionInitiated(ProviderSession session) {
306         dataBroker = session.getService(DataBrokerService.class);
307
308         DataModificationTransaction transaction = dataBroker.beginTransaction();
309         if (operationalNodeNotExisting(transaction)) {
310             transaction.putOperationalData(path, getNodeWithId());
311         }
312         if (configurationNodeNotExisting(transaction)) {
313             transaction.putConfigurationData(path, getNodeWithId());
314         }
315
316         try {
317             transaction.commit().get();
318         } catch (InterruptedException e) {
319             throw new RuntimeException("Interrupted while waiting for response", e);
320         } catch (ExecutionException e) {
321             throw new RuntimeException("Read configuration data " + path + " failed", e);
322         }
323
324         mountService = session.getService(MountProvisionService.class);
325         if (mountService != null) {
326             mountInstance = mountService.createOrGetMountPoint(path);
327         }
328     }
329
330     CompositeNode getNodeWithId() {
331         SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
332         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
333     }
334
335     boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
336         return null == transaction.readConfigurationData(path);
337     }
338
339     boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
340         return null == transaction.readOperationalData(path);
341     }
342
343     static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
344
345         Node<?> current = node;
346         for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
347             if (current instanceof SimpleNode<?>) {
348                 return null;
349             } else if (current instanceof CompositeNode) {
350                 CompositeNode currentComposite = (CompositeNode) current;
351
352                 current = currentComposite.getFirstCompositeByName(arg.getNodeType());
353                 if (current == null) {
354                     current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
355                 }
356                 if (current == null) {
357                     current = currentComposite.getFirstSimpleByName(arg.getNodeType());
358                 }
359                 if (current == null) {
360                     current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
361                 }
362                 if (current == null) {
363                     return null;
364                 }
365             }
366         }
367         return current;
368     }
369
370     @Override
371     public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
372             DataModification<InstanceIdentifier, CompositeNode> modification) {
373         NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
374                 modification, true, rollbackSupported);
375         try {
376             twoPhaseCommit.prepare();
377         } catch (InterruptedException e) {
378             throw new RuntimeException("Interrupted while waiting for response", e);
379         } catch (ExecutionException e) {
380             throw new RuntimeException("Read configuration data " + path + " failed", e);
381         }
382          return twoPhaseCommit;
383     }
384
385     Set<QName> getCapabilities(Collection<String> capabilities) {
386         return FluentIterable.from(capabilities).filter(new Predicate<String>() {
387             @Override
388             public boolean apply(final String capability) {
389                 return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
390             }
391         }).transform(new Function<String, QName>() {
392             @Override
393             public QName apply(final String capability) {
394                 String[] parts = capability.split("\\?");
395                 String namespace = parts[0];
396                 FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
397
398                 String revision = getStringAndTransform(queryParams, "revision=", "revision=");
399
400                 String moduleName = getStringAndTransform(queryParams, "module=", "module=");
401
402                 if (revision == null) {
403                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
404                     revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
405
406                     if (revision != null) {
407                         logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
408                     }
409                 }
410                 if (revision == null) {
411                     return QName.create(URI.create(namespace), null, moduleName);
412                 }
413                 return QName.create(namespace, revision, moduleName);
414             }
415
416             private String getStringAndTransform(final Iterable<String> queryParams, final String match,
417                     final String substringToRemove) {
418                 Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
419                     @Override
420                     public boolean apply(final String input) {
421                         return input.startsWith(match);
422                     }
423                 });
424
425                 return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
426             }
427
428         }).toSet();
429     }
430
431     @Override
432     public void close() {
433         bringDown();
434     }
435
436     public String getName() {
437         return name;
438     }
439
440     public InetSocketAddress getSocketAddress() {
441         return socketAddress;
442     }
443
444     public MountProvisionInstance getMountInstance() {
445         return mountInstance;
446     }
447
448     public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
449         this.reconnectStrategy = reconnectStrategy;
450     }
451
452     public void setProcessingExecutor(final ExecutorService processingExecutor) {
453         this.processingExecutor = processingExecutor;
454     }
455
456     public void setSocketAddress(final InetSocketAddress socketAddress) {
457         this.socketAddress = socketAddress;
458     }
459
460     public void setEventExecutor(final EventExecutor eventExecutor) {
461         this.eventExecutor = eventExecutor;
462     }
463
464     public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
465         this.schemaSourceProvider = schemaSourceProvider;
466     }
467
468     public void setDispatcher(final NetconfClientDispatcher dispatcher) {
469         this.dispatcher = dispatcher;
470     }
471
472     public void setClientConfig(final NetconfClientConfiguration clientConfig) {
473         this.clientConfig = clientConfig;
474     }
475
476 }
477
478 class NetconfDeviceSchemaContextProvider {
479
480     NetconfDevice device;
481
482     SchemaSourceProvider<InputStream> sourceProvider;
483
484     Optional<SchemaContext> currentContext;
485
486     NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
487         this.device = device;
488         this.sourceProvider = sourceProvider;
489         this.currentContext = Optional.absent();
490     }
491
492     void createContextFromCapabilities(Iterable<QName> capabilities) {
493         YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
494         if (!sourceContext.getMissingSources().isEmpty()) {
495             device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
496         }
497         device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
498         List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
499         if (!sourceContext.getValidSources().isEmpty()) {
500             SchemaContext schemaContext = tryToCreateContext(modelsToParse);
501             currentContext = Optional.fromNullable(schemaContext);
502         } else {
503             currentContext = Optional.absent();
504         }
505         if (currentContext.isPresent()) {
506             device.logger.debug("Schema context successfully created.");
507         }
508     }
509
510     SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
511         YangParserImpl parser = new YangParserImpl();
512         try {
513
514             Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
515             return parser.resolveSchemaContext(models);
516         } catch (Exception e) {
517             device.logger.debug("Error occured during parsing YANG schemas", e);
518             return null;
519         }
520     }
521 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.