2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connect.netconf;
10 import static com.google.common.base.Preconditions.checkState;
11 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
12 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
13 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
14 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
15 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
16 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
17 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
18 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
19 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
20 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
21 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
22 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
24 import com.google.common.base.Preconditions;
25 import java.io.InputStream;
26 import java.net.InetSocketAddress;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.List;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.ExecutorService;
37 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
38 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
39 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
40 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
41 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
42 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
43 import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
44 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
45 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
46 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
47 import org.opendaylight.controller.sal.core.api.Provider;
48 import org.opendaylight.controller.sal.core.api.RpcImplementation;
49 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
50 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
51 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
52 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
53 import org.opendaylight.protocol.framework.ReconnectStrategy;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
56 import org.opendaylight.yangtools.concepts.Registration;
57 import org.opendaylight.yangtools.yang.common.QName;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
60 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
61 import org.opendaylight.yangtools.yang.data.api.Node;
62 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
63 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
64 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
65 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
66 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
67 import org.opendaylight.yangtools.yang.model.api.Module;
68 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
69 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
70 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
71 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
72 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
73 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
77 import com.google.common.base.Function;
78 import com.google.common.base.Optional;
79 import com.google.common.base.Predicate;
80 import com.google.common.collect.FluentIterable;
81 import com.google.common.collect.Iterables;
82 import com.google.common.util.concurrent.ListenableFuture;
83 import io.netty.util.concurrent.EventExecutor;
85 public class NetconfDevice implements Provider, //
86 DataReader<InstanceIdentifier, CompositeNode>, //
87 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
91 InetSocketAddress socketAddress;
93 MountProvisionInstance mountInstance;
95 EventExecutor eventExecutor;
97 ExecutorService processingExecutor;
99 InstanceIdentifier path;
101 ReconnectStrategy reconnectStrategy;
103 AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
105 private NetconfDeviceSchemaContextProvider deviceContextProvider;
107 protected Logger logger;
109 Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
110 Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
111 Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
112 List<RpcRegistration> rpcReg;
116 MountProvisionService mountService;
118 NetconfClientDispatcher dispatcher;
120 static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
122 SchemaSourceProvider<InputStream> remoteSourceProvider;
124 private volatile DataBrokerService dataBroker;
126 NetconfDeviceListener listener;
128 private boolean rollbackSupported;
130 private NetconfReconnectingClientConfiguration clientConfig;
131 private volatile DataProviderService dataProviderService;
133 public NetconfDevice(String name) {
135 this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
136 this.path = InstanceIdentifier.builder(INVENTORY_PATH)
137 .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
140 public void start() {
141 checkState(dispatcher != null, "Dispatcher must be set.");
142 checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
143 checkState(eventExecutor != null, "Event executor must be set.");
145 Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener);
146 listener = (NetconfDeviceListener) clientConfig.getSessionListener();
148 logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
150 dispatcher.createReconnectingClient(clientConfig);
153 Optional<SchemaContext> getSchemaContext() {
154 if (deviceContextProvider == null) {
155 return Optional.absent();
157 return deviceContextProvider.currentContext;
161 if (rpcReg != null) {
162 for (RpcRegistration reg : rpcReg) {
167 closeGracefully(confReaderReg);
168 confReaderReg = null;
169 closeGracefully(operReaderReg);
170 operReaderReg = null;
171 closeGracefully(commitHandlerReg);
172 commitHandlerReg = null;
174 updateDeviceState(false, Collections.<QName> emptySet());
177 private void closeGracefully(final AutoCloseable resource) {
178 if (resource != null) {
181 } catch (Exception e) {
182 logger.warn("Ignoring exception while closing {}", resource, e);
187 void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
188 // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
189 // Reason: delegate.getSchema blocks thread when waiting for response
190 // however, if the netty thread is blocked, no incoming message can be processed
191 // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
192 // TODO redesign +refactor
193 processingExecutor.submit(new Runnable() {
196 NetconfDevice.this.rollbackSupported = rollbackSupported;
197 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
198 deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
199 deviceContextProvider.createContextFromCapabilities(capabilities);
200 if (mountInstance != null && getSchemaContext().isPresent()) {
201 mountInstance.setSchemaContext(getSchemaContext().get());
204 updateDeviceState(true, capabilities);
206 if (mountInstance != null) {
207 confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
208 operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
209 commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
211 List<RpcRegistration> rpcs = new ArrayList<>();
212 // TODO same condition twice
213 if (mountInstance != null && getSchemaContext().isPresent()) {
214 for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
215 rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
224 private void updateDeviceState(boolean up, Set<QName> capabilities) {
225 checkDataStoreState();
227 DataModificationTransaction transaction = dataBroker.beginTransaction();
229 CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
230 it.setQName(INVENTORY_NODE);
231 it.addLeaf(INVENTORY_ID, name);
232 it.addLeaf(INVENTORY_CONNECTED, up);
234 logger.debug("Client capabilities {}", capabilities);
235 for (QName capability : capabilities) {
236 it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString());
239 logger.debug("Update device state transaction " + transaction.getIdentifier()
240 + " putting operational data started.");
241 transaction.removeOperationalData(path);
242 transaction.putOperationalData(path, it.toInstance());
243 logger.debug("Update device state transaction " + transaction.getIdentifier()
244 + " putting operational data ended.");
246 // FIXME: this has to be asynchronous
247 RpcResult<TransactionStatus> transactionStatus = null;
249 transactionStatus = transaction.commit().get();
250 } catch (InterruptedException e) {
251 throw new RuntimeException("Interrupted while waiting for response", e);
252 } catch (ExecutionException e) {
253 throw new RuntimeException("Read configuration data " + path + " failed", e);
255 // TODO better ex handling
257 if (transactionStatus.isSuccessful()) {
258 logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
260 logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
261 logger.debug("Update device state transaction status " + transaction.getStatus());
266 public CompositeNode readConfigurationData(InstanceIdentifier path) {
267 RpcResult<CompositeNode> result = null;
269 result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
270 wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
271 } catch (InterruptedException e) {
272 throw new RuntimeException("Interrupted while waiting for response", e);
273 } catch (ExecutionException e) {
274 throw new RuntimeException("Read configuration data " + path + " failed", e);
277 CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
278 return data == null ? null : (CompositeNode) findNode(data, path);
282 public CompositeNode readOperationalData(InstanceIdentifier path) {
283 RpcResult<CompositeNode> result = null;
285 result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
286 } catch (InterruptedException e) {
287 throw new RuntimeException("Interrupted while waiting for response", e);
288 } catch (ExecutionException e) {
289 throw new RuntimeException("Read configuration data " + path + " failed", e);
292 CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
293 return (CompositeNode) findNode(data, path);
297 public Set<QName> getSupportedRpcs() {
298 return Collections.emptySet();
302 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
303 return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
307 public Collection<ProviderFunctionality> getProviderFunctionality() {
308 return Collections.emptySet();
312 public void onSessionInitiated(ProviderSession session) {
313 dataBroker = session.getService(DataBrokerService.class);
315 processingExecutor.submit(new Runnable() {
318 updateInitialState();
322 mountService = session.getService(MountProvisionService.class);
323 if (mountService != null) {
324 mountInstance = mountService.createOrGetMountPoint(path);
328 private void updateInitialState() {
329 checkDataStoreState();
331 DataModificationTransaction transaction = dataBroker.beginTransaction();
332 if (operationalNodeNotExisting(transaction)) {
333 transaction.putOperationalData(path, getNodeWithId());
335 if (configurationNodeNotExisting(transaction)) {
336 transaction.putConfigurationData(path, getNodeWithId());
340 transaction.commit().get();
341 } catch (InterruptedException e) {
342 throw new RuntimeException("Interrupted while waiting for response", e);
343 } catch (ExecutionException e) {
344 throw new RuntimeException("Read configuration data " + path + " failed", e);
348 private void checkDataStoreState() {
349 // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore
350 dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder(
351 Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build()); }
353 CompositeNode getNodeWithId() {
354 SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
355 return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
358 boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
359 return null == transaction.readConfigurationData(path);
362 boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
363 return null == transaction.readOperationalData(path);
366 static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
368 Node<?> current = node;
369 for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
370 if (current instanceof SimpleNode<?>) {
372 } else if (current instanceof CompositeNode) {
373 CompositeNode currentComposite = (CompositeNode) current;
375 current = currentComposite.getFirstCompositeByName(arg.getNodeType());
376 if (current == null) {
377 current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
379 if (current == null) {
380 current = currentComposite.getFirstSimpleByName(arg.getNodeType());
382 if (current == null) {
383 current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
385 if (current == null) {
394 public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
395 DataModification<InstanceIdentifier, CompositeNode> modification) {
396 NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
397 modification, true, rollbackSupported);
399 twoPhaseCommit.prepare();
400 } catch (InterruptedException e) {
401 throw new RuntimeException("Interrupted while waiting for response", e);
402 } catch (ExecutionException e) {
403 throw new RuntimeException("Read configuration data " + path + " failed", e);
405 return twoPhaseCommit;
408 Set<QName> getCapabilities(Collection<String> capabilities) {
409 return FluentIterable.from(capabilities).filter(new Predicate<String>() {
411 public boolean apply(final String capability) {
412 return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
414 }).transform(new Function<String, QName>() {
416 public QName apply(final String capability) {
417 String[] parts = capability.split("\\?");
418 String namespace = parts[0];
419 FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
421 String revision = getStringAndTransform(queryParams, "revision=", "revision=");
423 String moduleName = getStringAndTransform(queryParams, "module=", "module=");
425 if (revision == null) {
426 logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
427 revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
429 if (revision != null) {
430 logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
433 if (revision == null) {
434 return QName.create(URI.create(namespace), null, moduleName);
436 return QName.create(namespace, revision, moduleName);
439 private String getStringAndTransform(final Iterable<String> queryParams, final String match,
440 final String substringToRemove) {
441 Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
443 public boolean apply(final String input) {
444 return input.startsWith(match);
448 return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
455 public void close() {
459 public String getName() {
463 public InetSocketAddress getSocketAddress() {
464 return socketAddress;
467 public MountProvisionInstance getMountInstance() {
468 return mountInstance;
471 public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
472 this.reconnectStrategy = reconnectStrategy;
475 public void setProcessingExecutor(final ExecutorService processingExecutor) {
476 this.processingExecutor = processingExecutor;
479 public void setSocketAddress(final InetSocketAddress socketAddress) {
480 this.socketAddress = socketAddress;
483 public void setEventExecutor(final EventExecutor eventExecutor) {
484 this.eventExecutor = eventExecutor;
487 public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
488 this.schemaSourceProvider = schemaSourceProvider;
491 public void setDispatcher(final NetconfClientDispatcher dispatcher) {
492 this.dispatcher = dispatcher;
495 public void setClientConfig(final NetconfReconnectingClientConfiguration clientConfig) {
496 this.clientConfig = clientConfig;
499 public void setDataProviderService(final DataProviderService dataProviderService) {
500 this.dataProviderService = dataProviderService;
504 class NetconfDeviceSchemaContextProvider {
506 NetconfDevice device;
508 SchemaSourceProvider<InputStream> sourceProvider;
510 Optional<SchemaContext> currentContext;
512 NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
513 this.device = device;
514 this.sourceProvider = sourceProvider;
515 this.currentContext = Optional.absent();
518 void createContextFromCapabilities(Iterable<QName> capabilities) {
519 YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
520 if (!sourceContext.getMissingSources().isEmpty()) {
521 device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
523 device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
524 List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
525 if (!sourceContext.getValidSources().isEmpty()) {
526 SchemaContext schemaContext = tryToCreateContext(modelsToParse);
527 currentContext = Optional.fromNullable(schemaContext);
529 currentContext = Optional.absent();
531 if (currentContext.isPresent()) {
532 device.logger.debug("Schema context successfully created.");
536 SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
537 YangParserImpl parser = new YangParserImpl();
540 Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
541 return parser.resolveSchemaContext(models);
542 } catch (Exception e) {
543 device.logger.debug("Error occured during parsing YANG schemas", e);