2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connect.netconf;
10 import static com.google.common.base.Preconditions.checkState;
11 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
12 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
13 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
14 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
15 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
16 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
17 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
18 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
19 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
20 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
21 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
22 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
24 import com.google.common.base.Preconditions;
25 import java.io.InputStream;
26 import java.net.InetSocketAddress;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.List;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.ExecutorService;
37 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
38 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
39 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
40 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
41 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
42 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
43 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
44 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
45 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
46 import org.opendaylight.controller.sal.core.api.Provider;
47 import org.opendaylight.controller.sal.core.api.RpcImplementation;
48 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
49 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
50 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
51 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
52 import org.opendaylight.protocol.framework.ReconnectStrategy;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
55 import org.opendaylight.yangtools.concepts.Registration;
56 import org.opendaylight.yangtools.yang.common.QName;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
59 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
60 import org.opendaylight.yangtools.yang.data.api.Node;
61 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
62 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
63 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
64 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
65 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
66 import org.opendaylight.yangtools.yang.model.api.Module;
67 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
68 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
69 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
70 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
71 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
72 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
76 import com.google.common.base.Function;
77 import com.google.common.base.Optional;
78 import com.google.common.base.Predicate;
79 import com.google.common.collect.FluentIterable;
80 import com.google.common.collect.Iterables;
81 import com.google.common.util.concurrent.ListenableFuture;
82 import io.netty.util.concurrent.EventExecutor;
84 public class NetconfDevice implements Provider, //
85 DataReader<InstanceIdentifier, CompositeNode>, //
86 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
90 InetSocketAddress socketAddress;
92 MountProvisionInstance mountInstance;
94 EventExecutor eventExecutor;
96 ExecutorService processingExecutor;
98 InstanceIdentifier path;
100 ReconnectStrategy reconnectStrategy;
102 AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
104 private NetconfDeviceSchemaContextProvider deviceContextProvider;
106 protected Logger logger;
108 Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
109 Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
110 Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
111 List<RpcRegistration> rpcReg;
115 MountProvisionService mountService;
117 NetconfClientDispatcher dispatcher;
119 static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
121 SchemaSourceProvider<InputStream> remoteSourceProvider;
123 private volatile DataBrokerService dataBroker;
125 NetconfDeviceListener listener;
127 private boolean rollbackSupported;
129 private NetconfClientConfiguration clientConfig;
130 private volatile DataProviderService dataProviderService;
132 public NetconfDevice(String name) {
134 this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
135 this.path = InstanceIdentifier.builder(INVENTORY_PATH)
136 .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
139 public void start() {
140 checkState(dispatcher != null, "Dispatcher must be set.");
141 checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
142 checkState(eventExecutor != null, "Event executor must be set.");
144 Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener);
145 listener = (NetconfDeviceListener) clientConfig.getSessionListener();
147 logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
149 dispatcher.createClient(clientConfig);
152 Optional<SchemaContext> getSchemaContext() {
153 if (deviceContextProvider == null) {
154 return Optional.absent();
156 return deviceContextProvider.currentContext;
160 if (rpcReg != null) {
161 for (RpcRegistration reg : rpcReg) {
166 closeGracefully(confReaderReg);
167 confReaderReg = null;
168 closeGracefully(operReaderReg);
169 operReaderReg = null;
170 closeGracefully(commitHandlerReg);
171 commitHandlerReg = null;
173 updateDeviceState(false, Collections.<QName> emptySet());
176 private void closeGracefully(final AutoCloseable resource) {
177 if (resource != null) {
180 } catch (Exception e) {
181 logger.warn("Ignoring exception while closing {}", resource, e);
186 void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
187 // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
188 // Reason: delegate.getSchema blocks thread when waiting for response
189 // however, if the netty thread is blocked, no incoming message can be processed
190 // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
191 // TODO redesign +refactor
192 processingExecutor.submit(new Runnable() {
195 NetconfDevice.this.rollbackSupported = rollbackSupported;
196 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
197 deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
198 deviceContextProvider.createContextFromCapabilities(capabilities);
199 if (mountInstance != null && getSchemaContext().isPresent()) {
200 mountInstance.setSchemaContext(getSchemaContext().get());
203 updateDeviceState(true, capabilities);
205 if (mountInstance != null) {
206 confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
207 operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
208 commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
210 List<RpcRegistration> rpcs = new ArrayList<>();
211 // TODO same condition twice
212 if (mountInstance != null && getSchemaContext().isPresent()) {
213 for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
214 rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
223 private void updateDeviceState(boolean up, Set<QName> capabilities) {
224 checkDataStoreState();
226 DataModificationTransaction transaction = dataBroker.beginTransaction();
228 CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
229 it.setQName(INVENTORY_NODE);
230 it.addLeaf(INVENTORY_ID, name);
231 it.addLeaf(INVENTORY_CONNECTED, up);
233 logger.debug("Client capabilities {}", capabilities);
234 for (QName capability : capabilities) {
235 it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString());
238 logger.debug("Update device state transaction " + transaction.getIdentifier()
239 + " putting operational data started.");
240 transaction.removeOperationalData(path);
241 transaction.putOperationalData(path, it.toInstance());
242 logger.debug("Update device state transaction " + transaction.getIdentifier()
243 + " putting operational data ended.");
245 // FIXME: this has to be asynchronous
246 RpcResult<TransactionStatus> transactionStatus = null;
248 transactionStatus = transaction.commit().get();
249 } catch (InterruptedException e) {
250 throw new RuntimeException("Interrupted while waiting for response", e);
251 } catch (ExecutionException e) {
252 throw new RuntimeException("Read configuration data " + path + " failed", e);
254 // TODO better ex handling
256 if (transactionStatus.isSuccessful()) {
257 logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
259 logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
260 logger.debug("Update device state transaction status " + transaction.getStatus());
265 public CompositeNode readConfigurationData(InstanceIdentifier path) {
266 RpcResult<CompositeNode> result = null;
268 result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
269 wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
270 } catch (InterruptedException e) {
271 throw new RuntimeException("Interrupted while waiting for response", e);
272 } catch (ExecutionException e) {
273 throw new RuntimeException("Read configuration data " + path + " failed", e);
276 CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
277 return data == null ? null : (CompositeNode) findNode(data, path);
281 public CompositeNode readOperationalData(InstanceIdentifier path) {
282 RpcResult<CompositeNode> result = null;
284 result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
285 } catch (InterruptedException e) {
286 throw new RuntimeException("Interrupted while waiting for response", e);
287 } catch (ExecutionException e) {
288 throw new RuntimeException("Read configuration data " + path + " failed", e);
291 CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
292 return (CompositeNode) findNode(data, path);
296 public Set<QName> getSupportedRpcs() {
297 return Collections.emptySet();
301 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
302 return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
306 public Collection<ProviderFunctionality> getProviderFunctionality() {
307 return Collections.emptySet();
311 public void onSessionInitiated(ProviderSession session) {
312 dataBroker = session.getService(DataBrokerService.class);
314 processingExecutor.submit(new Runnable() {
317 updateInitialState();
321 mountService = session.getService(MountProvisionService.class);
322 if (mountService != null) {
323 mountInstance = mountService.createOrGetMountPoint(path);
327 private void updateInitialState() {
328 checkDataStoreState();
330 DataModificationTransaction transaction = dataBroker.beginTransaction();
331 if (operationalNodeNotExisting(transaction)) {
332 transaction.putOperationalData(path, getNodeWithId());
334 if (configurationNodeNotExisting(transaction)) {
335 transaction.putConfigurationData(path, getNodeWithId());
339 transaction.commit().get();
340 } catch (InterruptedException e) {
341 throw new RuntimeException("Interrupted while waiting for response", e);
342 } catch (ExecutionException e) {
343 throw new RuntimeException("Read configuration data " + path + " failed", e);
347 private void checkDataStoreState() {
348 // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore
349 dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder(
350 Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build()); }
352 CompositeNode getNodeWithId() {
353 SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
354 return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
357 boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
358 return null == transaction.readConfigurationData(path);
361 boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
362 return null == transaction.readOperationalData(path);
365 static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
367 Node<?> current = node;
368 for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
369 if (current instanceof SimpleNode<?>) {
371 } else if (current instanceof CompositeNode) {
372 CompositeNode currentComposite = (CompositeNode) current;
374 current = currentComposite.getFirstCompositeByName(arg.getNodeType());
375 if (current == null) {
376 current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
378 if (current == null) {
379 current = currentComposite.getFirstSimpleByName(arg.getNodeType());
381 if (current == null) {
382 current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
384 if (current == null) {
393 public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
394 DataModification<InstanceIdentifier, CompositeNode> modification) {
395 NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
396 modification, true, rollbackSupported);
398 twoPhaseCommit.prepare();
399 } catch (InterruptedException e) {
400 throw new RuntimeException("Interrupted while waiting for response", e);
401 } catch (ExecutionException e) {
402 throw new RuntimeException("Read configuration data " + path + " failed", e);
404 return twoPhaseCommit;
407 Set<QName> getCapabilities(Collection<String> capabilities) {
408 return FluentIterable.from(capabilities).filter(new Predicate<String>() {
410 public boolean apply(final String capability) {
411 return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
413 }).transform(new Function<String, QName>() {
415 public QName apply(final String capability) {
416 String[] parts = capability.split("\\?");
417 String namespace = parts[0];
418 FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
420 String revision = getStringAndTransform(queryParams, "revision=", "revision=");
422 String moduleName = getStringAndTransform(queryParams, "module=", "module=");
424 if (revision == null) {
425 logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
426 revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
428 if (revision != null) {
429 logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
432 if (revision == null) {
433 return QName.create(URI.create(namespace), null, moduleName);
435 return QName.create(namespace, revision, moduleName);
438 private String getStringAndTransform(final Iterable<String> queryParams, final String match,
439 final String substringToRemove) {
440 Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
442 public boolean apply(final String input) {
443 return input.startsWith(match);
447 return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
454 public void close() {
458 public String getName() {
462 public InetSocketAddress getSocketAddress() {
463 return socketAddress;
466 public MountProvisionInstance getMountInstance() {
467 return mountInstance;
470 public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
471 this.reconnectStrategy = reconnectStrategy;
474 public void setProcessingExecutor(final ExecutorService processingExecutor) {
475 this.processingExecutor = processingExecutor;
478 public void setSocketAddress(final InetSocketAddress socketAddress) {
479 this.socketAddress = socketAddress;
482 public void setEventExecutor(final EventExecutor eventExecutor) {
483 this.eventExecutor = eventExecutor;
486 public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
487 this.schemaSourceProvider = schemaSourceProvider;
490 public void setDispatcher(final NetconfClientDispatcher dispatcher) {
491 this.dispatcher = dispatcher;
494 public void setClientConfig(final NetconfClientConfiguration clientConfig) {
495 this.clientConfig = clientConfig;
498 public void setDataProviderService(final DataProviderService dataProviderService) {
499 this.dataProviderService = dataProviderService;
503 class NetconfDeviceSchemaContextProvider {
505 NetconfDevice device;
507 SchemaSourceProvider<InputStream> sourceProvider;
509 Optional<SchemaContext> currentContext;
511 NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
512 this.device = device;
513 this.sourceProvider = sourceProvider;
514 this.currentContext = Optional.absent();
517 void createContextFromCapabilities(Iterable<QName> capabilities) {
518 YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
519 if (!sourceContext.getMissingSources().isEmpty()) {
520 device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
522 device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
523 List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
524 if (!sourceContext.getValidSources().isEmpty()) {
525 SchemaContext schemaContext = tryToCreateContext(modelsToParse);
526 currentContext = Optional.fromNullable(schemaContext);
528 currentContext = Optional.absent();
530 if (currentContext.isPresent()) {
531 device.logger.debug("Schema context successfully created.");
535 SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
536 YangParserImpl parser = new YangParserImpl();
539 Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
540 return parser.resolveSchemaContext(models);
541 } catch (Exception e) {
542 device.logger.debug("Error occured during parsing YANG schemas", e);