2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.client.mdsal;
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_NODEID;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.Executor;
19 import org.checkerframework.checker.lock.qual.GuardedBy;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
22 import org.opendaylight.netconf.api.messages.NetconfMessage;
23 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchema;
24 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemaProvider;
25 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
26 import org.opendaylight.netconf.client.mdsal.api.DeviceNetconfSchemaProvider;
27 import org.opendaylight.netconf.client.mdsal.api.NetconfRpcService;
28 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
29 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
30 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceCommunicator;
31 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
32 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
33 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
34 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
35 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil;
36 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformer;
37 import org.opendaylight.netconf.client.mdsal.spi.NetconfDeviceRpc;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.Get;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscription;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
41 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
42 import org.opendaylight.yangtools.rfc8528.model.api.SchemaMountConstants;
43 import org.opendaylight.yangtools.yang.common.QName;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.schema.MountPointContext;
47 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
48 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade.
55 public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
56 private static final Logger LOG = LoggerFactory.getLogger(NetconfDevice.class);
57 private static final QName RFC8528_SCHEMA_MOUNTS_QNAME = QName.create(
58 SchemaMountConstants.RFC8528_MODULE, "schema-mounts").intern();
59 private static final YangInstanceIdentifier RFC8528_SCHEMA_MOUNTS = YangInstanceIdentifier.of(
60 NodeIdentifier.create(RFC8528_SCHEMA_MOUNTS_QNAME));
62 protected final RemoteDeviceId id;
63 private final BaseNetconfSchemaProvider baseSchemaProvider;
64 private final DeviceNetconfSchemaProvider deviceSchemaProvider;
65 private final Executor processingExecutor;
67 private final RemoteDeviceHandler salFacade;
68 private final DeviceActionFactory deviceActionFactory;
69 private final NotificationHandler notificationHandler;
70 private final boolean reconnectOnSchemasChange;
73 private ListenableFuture<?> schemaFuture;
75 private boolean connected = false;
77 public NetconfDevice(final RemoteDeviceId id,final BaseNetconfSchemaProvider baseSchemaProvider,
78 final DeviceNetconfSchemaProvider deviceSchemaProvider, final RemoteDeviceHandler salFacade,
79 final Executor globalProcessingExecutor, final boolean reconnectOnSchemasChange) {
80 this(id, baseSchemaProvider, deviceSchemaProvider, salFacade, globalProcessingExecutor,
81 reconnectOnSchemasChange, null);
84 public NetconfDevice(final RemoteDeviceId id, final BaseNetconfSchemaProvider baseSchemaProvider,
85 final DeviceNetconfSchemaProvider deviceSchemaProvider, final RemoteDeviceHandler salFacade,
86 final Executor globalProcessingExecutor, final boolean reconnectOnSchemasChange,
87 final DeviceActionFactory deviceActionFactory) {
88 this.id = requireNonNull(id);
89 this.baseSchemaProvider = requireNonNull(baseSchemaProvider);
90 this.deviceSchemaProvider = requireNonNull(deviceSchemaProvider);
91 this.reconnectOnSchemasChange = reconnectOnSchemasChange;
92 this.deviceActionFactory = deviceActionFactory;
93 this.salFacade = salFacade;
94 processingExecutor = requireNonNull(globalProcessingExecutor);
95 notificationHandler = new NotificationHandler(salFacade, id);
99 public synchronized void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
100 final NetconfDeviceCommunicator listener) {
101 // EffectiveModelContext setup has to be performed in a dedicated thread since we are in a Netty thread in this
103 // YANG models are being downloaded in this method and it would cause a deadlock if we used the netty thread
104 // https://netty.io/wiki/thread-model.html
106 LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
108 final var baseSchema = baseSchemaProvider.baseSchemaForCapabilities(remoteSessionCapabilities);
109 final var initRpc = new NetconfDeviceRpc(baseSchema.modelContext(), listener,
110 new NetconfMessageTransformer(baseSchema.mountPointContext(), false, baseSchema));
112 final var deviceSchema = deviceSchemaProvider.deviceNetconfSchemaFor(id, remoteSessionCapabilities, initRpc,
113 baseSchema, processingExecutor);
115 // Potentially acquire mount point list and interpret it
116 final var netconfDeviceSchemaFuture = Futures.transformAsync(deviceSchema,
117 result -> Futures.transform(createMountPointContext(result.modelContext(), baseSchema, listener),
118 mount -> new NetconfDeviceSchema(result.capabilities(), mount), processingExecutor),
120 schemaFuture = netconfDeviceSchemaFuture;
122 Futures.addCallback(netconfDeviceSchemaFuture, new FutureCallback<>() {
124 public void onSuccess(final NetconfDeviceSchema result) {
125 handleSalInitializationSuccess(listener, baseSchema, result, remoteSessionCapabilities,
126 getDeviceSpecificRpc(result.mountContext(), listener, baseSchema));
130 public void onFailure(final Throwable cause) {
131 // The method onRemoteSessionDown was called while the EffectiveModelContext for the device was being
133 if (cause instanceof CancellationException) {
134 LOG.warn("{}: Device communicator was tear down since the schema setup started", id);
136 handleSalInitializationFailure(listener, cause);
139 }, MoreExecutors.directExecutor());
142 private void registerToBaseNetconfStream(final NetconfRpcService deviceRpc,
143 final NetconfDeviceCommunicator listener) {
144 // TODO check whether the model describing create subscription is present in schema
145 // Perhaps add a default schema context to support create-subscription if the model was not provided
146 // (same as what we do for base netconf operations in transformer)
147 final var rpcResultListenableFuture = deviceRpc.invokeNetconf(CreateSubscription.QNAME,
148 ImmutableNodes.newContainerBuilder()
149 .withNodeIdentifier(NodeIdentifier.create(CreateSubscriptionInput.QNAME))
150 // Note: default 'stream' is 'NETCONF', we do not need to create an explicit leaf
153 Futures.addCallback(rpcResultListenableFuture, new FutureCallback<DOMRpcResult>() {
155 public void onSuccess(final DOMRpcResult domRpcResult) {
156 notificationHandler.addNotificationFilter(notification -> {
157 if (NetconfCapabilityChange.QNAME.equals(notification.getBody().name().getNodeType())) {
158 LOG.info("{}: Schemas change detected, reconnecting", id);
159 // Only disconnect is enough,
160 // the reconnecting nature of the connector will take care of reconnecting
161 listener.disconnect();
169 public void onFailure(final Throwable throwable) {
170 LOG.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly",
173 }, MoreExecutors.directExecutor());
176 private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) {
177 return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
180 private synchronized void handleSalInitializationSuccess(final NetconfDeviceCommunicator listener,
181 final BaseNetconfSchema baseSchema, final NetconfDeviceSchema deviceSchema,
182 final NetconfSessionPreferences remoteSessionCapabilities, final Rpcs deviceRpc) {
183 // NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing,
184 // since salFacade.onDeviceDisconnected was already called.
186 LOG.warn("{}: Device communicator was closed before schema setup finished.", id);
190 if (shouldListenOnSchemaChange(remoteSessionCapabilities)) {
191 registerToBaseNetconfStream(deviceRpc, listener);
194 final var messageTransformer = new NetconfMessageTransformer(deviceSchema.mountContext(), true, baseSchema);
196 // Order is important here: salFacade has to see the device come up and then the notificationHandler can deliver
197 // whatever notifications have been held back
198 salFacade.onDeviceConnected(deviceSchema, remoteSessionCapabilities,
199 new RemoteDeviceServices(deviceRpc, deviceActionFactory == null ? null
200 : deviceActionFactory.createDeviceAction(messageTransformer, listener)));
201 notificationHandler.onRemoteSchemaUp(messageTransformer);
203 LOG.info("{}: Netconf connector initialized successfully", id);
206 private void handleSalInitializationFailure(final RemoteDeviceCommunicator listener, final Throwable cause) {
207 LOG.warn("{}: Unexpected error resolving device sources", id, cause);
209 cleanupInitialization();
210 salFacade.onDeviceFailed(cause);
213 private synchronized void cleanupInitialization() {
215 if (schemaFuture != null && !schemaFuture.isDone() && !schemaFuture.cancel(true)) {
216 LOG.warn("The cleanup of Schema Futures for device {} was unsuccessful.", id);
218 notificationHandler.onRemoteSchemaDown();
221 private ListenableFuture<@NonNull MountPointContext> createMountPointContext(
222 final EffectiveModelContext schemaContext, final BaseNetconfSchema baseSchema,
223 final NetconfDeviceCommunicator listener) {
224 final var emptyContext = MountPointContext.of(schemaContext);
225 if (schemaContext.findModule(SchemaMountConstants.RFC8528_MODULE).isEmpty()) {
226 return Futures.immediateFuture(emptyContext);
229 // Create a temporary RPC invoker and acquire the mount point tree
230 LOG.debug("{}: Acquiring available mount points", id);
231 final NetconfDeviceRpc deviceRpc = new NetconfDeviceRpc(schemaContext, listener,
232 new NetconfMessageTransformer(emptyContext, false, baseSchema));
234 return Futures.transform(deviceRpc.domRpcService().invokeRpc(Get.QNAME, ImmutableNodes.newContainerBuilder()
235 .withNodeIdentifier(NETCONF_GET_NODEID)
236 .withChild(NetconfMessageTransformUtil.toFilterStructure(RFC8528_SCHEMA_MOUNTS, schemaContext))
237 .build()), rpcResult -> processSchemaMounts(rpcResult, emptyContext), MoreExecutors.directExecutor());
240 private MountPointContext processSchemaMounts(final DOMRpcResult rpcResult, final MountPointContext emptyContext) {
241 final var errors = rpcResult.errors();
242 if (!errors.isEmpty()) {
243 LOG.warn("{}: Schema-mounts acquisition resulted in errors {}", id, errors);
245 final var schemaMounts = rpcResult.value();
246 if (schemaMounts == null) {
247 LOG.debug("{}: device does not define any schema mounts", id);
251 return DeviceMountPointContext.create(emptyContext, schemaMounts);
255 public void onRemoteSessionDown() {
256 cleanupInitialization();
257 salFacade.onDeviceDisconnected();
261 public void onNotification(final NetconfMessage notification) {
262 notificationHandler.handleNotification(notification);
265 protected NetconfDeviceRpc getDeviceSpecificRpc(final MountPointContext result,
266 final RemoteDeviceCommunicator listener, final BaseNetconfSchema schema) {
267 return new NetconfDeviceRpc(result.modelContext(), listener,
268 new NetconfMessageTransformer(result, true, schema));
272 * A dedicated exception to indicate when we fail to setup an {@link EffectiveModelContext}.
274 public static final class EmptySchemaContextException extends Exception {
276 private static final long serialVersionUID = 1L;
278 public EmptySchemaContextException(final String message) {