BUG 1001 Failing test - RestGetOperationTest.getStreamsTest
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
12 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
13 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
14 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
15 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
16 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
17 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
18 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
19 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
20 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
21 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
22 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
23
24 import com.google.common.base.Preconditions;
25 import java.io.InputStream;
26 import java.net.InetSocketAddress;
27 import java.net.URI;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.ExecutorService;
36
37 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
38 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
39 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
40 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
41 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
42 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
43 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
44 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
45 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
46 import org.opendaylight.controller.sal.core.api.Provider;
47 import org.opendaylight.controller.sal.core.api.RpcImplementation;
48 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
49 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
50 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
51 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
52 import org.opendaylight.protocol.framework.ReconnectStrategy;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
55 import org.opendaylight.yangtools.concepts.Registration;
56 import org.opendaylight.yangtools.yang.common.QName;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
59 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
60 import org.opendaylight.yangtools.yang.data.api.Node;
61 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
62 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
63 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
64 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
65 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
66 import org.opendaylight.yangtools.yang.model.api.Module;
67 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
68 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
69 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
70 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
71 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
72 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75
76 import com.google.common.base.Function;
77 import com.google.common.base.Optional;
78 import com.google.common.base.Predicate;
79 import com.google.common.collect.FluentIterable;
80 import com.google.common.collect.Iterables;
81 import com.google.common.util.concurrent.ListenableFuture;
82 import io.netty.util.concurrent.EventExecutor;
83
84 public class NetconfDevice implements Provider, //
85         DataReader<InstanceIdentifier, CompositeNode>, //
86         DataCommitHandler<InstanceIdentifier, CompositeNode>, //
87         RpcImplementation, //
88         AutoCloseable {
89
90     InetSocketAddress socketAddress;
91
92     MountProvisionInstance mountInstance;
93
94     EventExecutor eventExecutor;
95
96     ExecutorService processingExecutor;
97
98     InstanceIdentifier path;
99
100     ReconnectStrategy reconnectStrategy;
101
102     AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
103
104     private NetconfDeviceSchemaContextProvider deviceContextProvider;
105
106     protected Logger logger;
107
108     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
109     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
110     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
111     List<RpcRegistration> rpcReg;
112
113     String name;
114
115     MountProvisionService mountService;
116
117     NetconfClientDispatcher dispatcher;
118
119     static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
120
121     SchemaSourceProvider<InputStream> remoteSourceProvider;
122
123     private volatile DataBrokerService dataBroker;
124
125     NetconfDeviceListener listener;
126
127     private boolean rollbackSupported;
128
129     private NetconfClientConfiguration clientConfig;
130     private volatile DataProviderService dataProviderService;
131
132     public NetconfDevice(String name) {
133         this.name = name;
134         this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
135         this.path = InstanceIdentifier.builder(INVENTORY_PATH)
136                 .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
137     }
138
139     public void start() {
140         checkState(dispatcher != null, "Dispatcher must be set.");
141         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
142         checkState(eventExecutor != null, "Event executor must be set.");
143
144         Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener);
145         listener = (NetconfDeviceListener) clientConfig.getSessionListener();
146
147         logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
148
149         dispatcher.createClient(clientConfig);
150     }
151
152     Optional<SchemaContext> getSchemaContext() {
153         if (deviceContextProvider == null) {
154             return Optional.absent();
155         }
156         return deviceContextProvider.currentContext;
157     }
158
159     void bringDown() {
160         if (rpcReg != null) {
161             for (RpcRegistration reg : rpcReg) {
162                 reg.close();
163             }
164             rpcReg = null;
165         }
166         closeGracefully(confReaderReg);
167         confReaderReg = null;
168         closeGracefully(operReaderReg);
169         operReaderReg = null;
170         closeGracefully(commitHandlerReg);
171         commitHandlerReg = null;
172
173         updateDeviceState(false, Collections.<QName> emptySet());
174     }
175
176     private void closeGracefully(final AutoCloseable resource) {
177         if (resource != null) {
178             try {
179                 resource.close();
180             } catch (Exception e) {
181                 logger.warn("Ignoring exception while closing {}", resource, e);
182             }
183         }
184     }
185
186     void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
187         // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
188         // Reason: delegate.getSchema blocks thread when waiting for response
189         // however, if the netty thread is blocked, no incoming message can be processed
190         // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
191         // TODO redesign +refactor
192         processingExecutor.submit(new Runnable() {
193             @Override
194             public void run() {
195                 NetconfDevice.this.rollbackSupported = rollbackSupported;
196                 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
197                 deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
198                 deviceContextProvider.createContextFromCapabilities(capabilities);
199                 if (mountInstance != null && getSchemaContext().isPresent()) {
200                     mountInstance.setSchemaContext(getSchemaContext().get());
201                 }
202
203                 updateDeviceState(true, capabilities);
204
205                 if (mountInstance != null) {
206                     confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
207                     operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
208                     commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
209
210                     List<RpcRegistration> rpcs = new ArrayList<>();
211                     // TODO same condition twice
212                     if (mountInstance != null && getSchemaContext().isPresent()) {
213                         for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
214                             rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
215                         }
216                     }
217                     rpcReg = rpcs;
218                 }
219             }
220         });
221     }
222
223     private void updateDeviceState(boolean up, Set<QName> capabilities) {
224         checkDataStoreState();
225
226         DataModificationTransaction transaction = dataBroker.beginTransaction();
227
228         CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
229         it.setQName(INVENTORY_NODE);
230         it.addLeaf(INVENTORY_ID, name);
231         it.addLeaf(INVENTORY_CONNECTED, up);
232
233         logger.debug("Client capabilities {}", capabilities);
234         for (QName capability : capabilities) {
235             it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString());
236         }
237
238         logger.debug("Update device state transaction " + transaction.getIdentifier()
239                 + " putting operational data started.");
240         transaction.removeOperationalData(path);
241         transaction.putOperationalData(path, it.toInstance());
242         logger.debug("Update device state transaction " + transaction.getIdentifier()
243                 + " putting operational data ended.");
244
245         // FIXME: this has to be asynchronous
246         RpcResult<TransactionStatus> transactionStatus = null;
247         try {
248             transactionStatus = transaction.commit().get();
249         } catch (InterruptedException e) {
250             throw new RuntimeException("Interrupted while waiting for response", e);
251         } catch (ExecutionException e) {
252             throw new RuntimeException("Read configuration data " + path + " failed", e);
253         }
254         // TODO better ex handling
255
256         if (transactionStatus.isSuccessful()) {
257             logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
258         } else {
259             logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
260             logger.debug("Update device state transaction status " + transaction.getStatus());
261         }
262     }
263
264     @Override
265     public CompositeNode readConfigurationData(InstanceIdentifier path) {
266         RpcResult<CompositeNode> result = null;
267         try {
268             result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
269                     wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
270         } catch (InterruptedException e) {
271             throw new RuntimeException("Interrupted while waiting for response", e);
272         } catch (ExecutionException e) {
273             throw new RuntimeException("Read configuration data " + path + " failed", e);
274         }
275
276         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
277         return data == null ? null : (CompositeNode) findNode(data, path);
278     }
279
280     @Override
281     public CompositeNode readOperationalData(InstanceIdentifier path) {
282         RpcResult<CompositeNode> result = null;
283         try {
284             result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
285         } catch (InterruptedException e) {
286             throw new RuntimeException("Interrupted while waiting for response", e);
287         } catch (ExecutionException e) {
288             throw new RuntimeException("Read configuration data " + path + " failed", e);
289         }
290
291         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
292         return (CompositeNode) findNode(data, path);
293     }
294
295     @Override
296     public Set<QName> getSupportedRpcs() {
297         return Collections.emptySet();
298     }
299
300     @Override
301     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
302         return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
303     }
304
305     @Override
306     public Collection<ProviderFunctionality> getProviderFunctionality() {
307         return Collections.emptySet();
308     }
309
310     @Override
311     public void onSessionInitiated(ProviderSession session) {
312         dataBroker = session.getService(DataBrokerService.class);
313
314         processingExecutor.submit(new Runnable() {
315             @Override
316             public void run() {
317                 updateInitialState();
318             }
319         });
320
321         mountService = session.getService(MountProvisionService.class);
322         if (mountService != null) {
323             mountInstance = mountService.createOrGetMountPoint(path);
324         }
325     }
326
327     private void updateInitialState() {
328         checkDataStoreState();
329
330         DataModificationTransaction transaction = dataBroker.beginTransaction();
331         if (operationalNodeNotExisting(transaction)) {
332             transaction.putOperationalData(path, getNodeWithId());
333         }
334         if (configurationNodeNotExisting(transaction)) {
335             transaction.putConfigurationData(path, getNodeWithId());
336         }
337
338         try {
339             transaction.commit().get();
340         } catch (InterruptedException e) {
341             throw new RuntimeException("Interrupted while waiting for response", e);
342         } catch (ExecutionException e) {
343             throw new RuntimeException("Read configuration data " + path + " failed", e);
344         }
345     }
346
347     private void checkDataStoreState() {
348         // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore
349         dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder(
350                 Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build());    }
351
352     CompositeNode getNodeWithId() {
353         SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
354         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
355     }
356
357     boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
358         return null == transaction.readConfigurationData(path);
359     }
360
361     boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
362         return null == transaction.readOperationalData(path);
363     }
364
365     static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
366
367         Node<?> current = node;
368         for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
369             if (current instanceof SimpleNode<?>) {
370                 return null;
371             } else if (current instanceof CompositeNode) {
372                 CompositeNode currentComposite = (CompositeNode) current;
373
374                 current = currentComposite.getFirstCompositeByName(arg.getNodeType());
375                 if (current == null) {
376                     current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
377                 }
378                 if (current == null) {
379                     current = currentComposite.getFirstSimpleByName(arg.getNodeType());
380                 }
381                 if (current == null) {
382                     current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
383                 }
384                 if (current == null) {
385                     return null;
386                 }
387             }
388         }
389         return current;
390     }
391
392     @Override
393     public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
394             DataModification<InstanceIdentifier, CompositeNode> modification) {
395         NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
396                 modification, true, rollbackSupported);
397         try {
398             twoPhaseCommit.prepare();
399         } catch (InterruptedException e) {
400             throw new RuntimeException("Interrupted while waiting for response", e);
401         } catch (ExecutionException e) {
402             throw new RuntimeException("Read configuration data " + path + " failed", e);
403         }
404          return twoPhaseCommit;
405     }
406
407     Set<QName> getCapabilities(Collection<String> capabilities) {
408         return FluentIterable.from(capabilities).filter(new Predicate<String>() {
409             @Override
410             public boolean apply(final String capability) {
411                 return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
412             }
413         }).transform(new Function<String, QName>() {
414             @Override
415             public QName apply(final String capability) {
416                 String[] parts = capability.split("\\?");
417                 String namespace = parts[0];
418                 FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
419
420                 String revision = getStringAndTransform(queryParams, "revision=", "revision=");
421
422                 String moduleName = getStringAndTransform(queryParams, "module=", "module=");
423
424                 if (revision == null) {
425                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
426                     revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
427
428                     if (revision != null) {
429                         logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
430                     }
431                 }
432                 if (revision == null) {
433                     return QName.create(URI.create(namespace), null, moduleName);
434                 }
435                 return QName.create(namespace, revision, moduleName);
436             }
437
438             private String getStringAndTransform(final Iterable<String> queryParams, final String match,
439                     final String substringToRemove) {
440                 Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
441                     @Override
442                     public boolean apply(final String input) {
443                         return input.startsWith(match);
444                     }
445                 });
446
447                 return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
448             }
449
450         }).toSet();
451     }
452
453     @Override
454     public void close() {
455         bringDown();
456     }
457
458     public String getName() {
459         return name;
460     }
461
462     public InetSocketAddress getSocketAddress() {
463         return socketAddress;
464     }
465
466     public MountProvisionInstance getMountInstance() {
467         return mountInstance;
468     }
469
470     public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
471         this.reconnectStrategy = reconnectStrategy;
472     }
473
474     public void setProcessingExecutor(final ExecutorService processingExecutor) {
475         this.processingExecutor = processingExecutor;
476     }
477
478     public void setSocketAddress(final InetSocketAddress socketAddress) {
479         this.socketAddress = socketAddress;
480     }
481
482     public void setEventExecutor(final EventExecutor eventExecutor) {
483         this.eventExecutor = eventExecutor;
484     }
485
486     public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
487         this.schemaSourceProvider = schemaSourceProvider;
488     }
489
490     public void setDispatcher(final NetconfClientDispatcher dispatcher) {
491         this.dispatcher = dispatcher;
492     }
493
494     public void setClientConfig(final NetconfClientConfiguration clientConfig) {
495         this.clientConfig = clientConfig;
496     }
497
498     public void setDataProviderService(final DataProviderService dataProviderService) {
499         this.dataProviderService = dataProviderService;
500     }
501 }
502
503 class NetconfDeviceSchemaContextProvider {
504
505     NetconfDevice device;
506
507     SchemaSourceProvider<InputStream> sourceProvider;
508
509     Optional<SchemaContext> currentContext;
510
511     NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
512         this.device = device;
513         this.sourceProvider = sourceProvider;
514         this.currentContext = Optional.absent();
515     }
516
517     void createContextFromCapabilities(Iterable<QName> capabilities) {
518         YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
519         if (!sourceContext.getMissingSources().isEmpty()) {
520             device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
521         }
522         device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
523         List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
524         if (!sourceContext.getValidSources().isEmpty()) {
525             SchemaContext schemaContext = tryToCreateContext(modelsToParse);
526             currentContext = Optional.fromNullable(schemaContext);
527         } else {
528             currentContext = Optional.absent();
529         }
530         if (currentContext.isPresent()) {
531             device.logger.debug("Schema context successfully created.");
532         }
533     }
534
535     SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
536         YangParserImpl parser = new YangParserImpl();
537         try {
538
539             Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
540             return parser.resolveSchemaContext(models);
541         } catch (Exception e) {
542             device.logger.debug("Error occured during parsing YANG schemas", e);
543             return null;
544         }
545     }
546 }