Bump upstreams
[netconf.git] / plugins / netconf-client-mdsal / src / main / java / org / opendaylight / netconf / client / mdsal / NetconfDevice.java
index 67e5535940b19596f45596bce1eaf866a62f8158..1e3ca9148befb53663b3abbe0536b9e0f667ff1f 100644 (file)
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
@@ -37,7 +38,7 @@ import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.netconf.api.CapabilityURN;
-import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.api.messages.NetconfMessage;
 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
 import org.opendaylight.netconf.client.mdsal.api.NetconfDeviceSchemasResolver;
@@ -52,11 +53,14 @@ import org.opendaylight.netconf.client.mdsal.impl.BaseSchema;
 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil;
 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformer;
 import org.opendaylight.netconf.client.mdsal.spi.NetconfDeviceRpc;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.Get;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscription;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.available.capabilities.AvailableCapability;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.available.capabilities.AvailableCapabilityBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.unavailable.capabilities.UnavailableCapability;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.unavailable.capabilities.UnavailableCapability.FailureReason;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.available.capabilities.AvailableCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.available.capabilities.AvailableCapabilityBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.unavailable.capabilities.UnavailableCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.unavailable.capabilities.UnavailableCapability.FailureReason;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.rfc8528.model.api.SchemaMountConstants;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -64,13 +68,14 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.MountPointContext;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.source.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
-import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
-import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,12 +106,11 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
     private final boolean reconnectOnSchemasChange;
     private final BaseNetconfSchemas baseSchemas;
 
+    @GuardedBy("this")
+    private ListenableFuture<List<Object>> schemaFuturesList;
     @GuardedBy("this")
     private boolean connected = false;
 
-    // Message transformer is constructed once the schemas are available
-    private NetconfMessageTransformer messageTransformer;
-
     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas,
             final RemoteDeviceId id, final RemoteDeviceHandler salFacade, final Executor globalProcessingExecutor,
             final boolean reconnectOnSchemasChange) {
@@ -130,22 +134,19 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
     }
 
     @Override
-    public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
-                                  final NetconfDeviceCommunicator listener) {
-        // SchemaContext setup has to be performed in a dedicated thread since
-        // we are in a netty thread in this method
-        // Yang models are being downloaded in this method and it would cause a
-        // deadlock if we used the netty thread
-        // http://netty.io/wiki/thread-model.html
+    public synchronized void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
+            final NetconfDeviceCommunicator listener) {
+        // SchemaContext setup has to be performed in a dedicated thread since we are in a Netty thread in this method
+        // YANG models are being downloaded in this method and it would cause a deadlock if we used the netty thread
+        // https://netty.io/wiki/thread-model.html
         setConnected(true);
         LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
 
         final BaseSchema baseSchema = resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported());
-        final NetconfDeviceRpc initRpc = new NetconfDeviceRpc(baseSchema.getEffectiveModelContext(), listener,
+        final NetconfDeviceRpc initRpc = new NetconfDeviceRpc(baseSchema.modelContext(), listener,
             new NetconfMessageTransformer(baseSchema.getMountPointContext(), false, baseSchema));
-        final ListenableFuture<DeviceSources> sourceResolverFuture = Futures.submit(
-            new DeviceSourcesResolver(id, baseSchema, initRpc, remoteSessionCapabilities, stateSchemasResolver),
-            processingExecutor);
+        final var sourceResolverFuture = Futures.submit(new DeviceSourcesResolver(id, baseSchema, initRpc,
+                remoteSessionCapabilities, stateSchemasResolver), processingExecutor);
 
         if (shouldListenOnSchemaChange(remoteSessionCapabilities)) {
             registerToBaseNetconfStream(initRpc, listener);
@@ -155,13 +156,14 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         final var futureSchema = Futures.transformAsync(sourceResolverFuture,
             deviceSources -> assembleSchemaContext(deviceSources, remoteSessionCapabilities), processingExecutor);
 
-        Futures.addCallback(
-            // Potentially acquire mount point list and interpret it
-            Futures.transformAsync(futureSchema,
-                result -> Futures.transform(createMountPointContext(result.modelContext(), baseSchema, listener),
-                    mount -> new NetconfDeviceSchema(result.capabilities(), mount), processingExecutor),
-                processingExecutor),
-            new FutureCallback<>() {
+        // Potentially acquire mount point list and interpret it
+        final var netconfDeviceSchemaFuture = Futures.transformAsync(futureSchema,
+            result -> Futures.transform(createMountPointContext(result.modelContext(), baseSchema, listener),
+                mount -> new NetconfDeviceSchema(result.capabilities(), mount), processingExecutor),
+            processingExecutor);
+        schemaFuturesList = Futures.allAsList(sourceResolverFuture, futureSchema, netconfDeviceSchemaFuture);
+
+        Futures.addCallback(netconfDeviceSchemaFuture, new FutureCallback<>() {
                 @Override
                 public void onSuccess(final NetconfDeviceSchema result) {
                     handleSalInitializationSuccess(listener, result, remoteSessionCapabilities,
@@ -170,7 +172,13 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
 
                 @Override
                 public void onFailure(final Throwable cause) {
-                    handleSalInitializationFailure(listener, cause);
+                    // The method onRemoteSessionDown was called while the EffectiveModelContext for the device
+                    // was being processed.
+                    if (cause instanceof CancellationException) {
+                        LOG.warn("{}: Device communicator was tear down since the schema setup started", id);
+                    } else {
+                        handleSalInitializationFailure(listener, cause);
+                    }
                 }
             }, MoreExecutors.directExecutor());
     }
@@ -180,9 +188,11 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         // TODO check whether the model describing create subscription is present in schema
         // Perhaps add a default schema context to support create-subscription if the model was not provided
         // (same as what we do for base netconf operations in transformer)
-        final var rpcResultListenableFuture = deviceRpc.invokeRpc(
-                NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME,
-                NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+        final var rpcResultListenableFuture = deviceRpc.domRpcService()
+            .invokeRpc(CreateSubscription.QNAME, Builders.containerBuilder()
+                .withNodeIdentifier(NodeIdentifier.create(CreateSubscriptionInput.QNAME))
+                // Note: default 'stream' is 'NETCONF', we do not need to create an explicit leaf
+                .build());
 
         Futures.addCallback(rpcResultListenableFuture, new FutureCallback<DOMRpcResult>() {
             @Override
@@ -221,7 +231,7 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
             return;
         }
 
-        messageTransformer = new NetconfMessageTransformer(deviceSchema.mountContext(), true,
+        final var messageTransformer = new NetconfMessageTransformer(deviceSchema.mountContext(), true,
             resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported()));
 
         // Order is important here: salFacade has to see the device come up and then the notificationHandler can deliver
@@ -235,26 +245,22 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
     }
 
     private void handleSalInitializationFailure(final RemoteDeviceCommunicator listener, final Throwable cause) {
-        // FIXME: pick one of these messages
         LOG.warn("{}: Unexpected error resolving device sources", id, cause);
-        LOG.error("{}: Initialization in sal failed, disconnecting from device", id, cause);
         listener.close();
-        onRemoteSessionDown();
-        resetMessageTransformer();
-
-        // FIXME: this causes salFacade to see onDeviceDisconnected() and then onDeviceFailed(), which is quite weird
+        cleanupInitialization();
         salFacade.onDeviceFailed(cause);
     }
 
-    /**
-     * Set the transformer to null as is in initial state.
-     */
-    private void resetMessageTransformer() {
-        updateTransformer(null);
-    }
-
-    private synchronized void updateTransformer(final NetconfMessageTransformer transformer) {
-        messageTransformer = transformer;
+    private synchronized void cleanupInitialization() {
+        connected = false;
+        if (schemaFuturesList != null && !schemaFuturesList.isDone()) {
+            if (!schemaFuturesList.cancel(true)) {
+                LOG.warn("The cleanup of Schema Futures for device {} was unsuccessful.", id);
+            }
+        }
+        notificationHandler.onRemoteSchemaDown();
+        sourceRegistrations.forEach(Registration::close);
+        sourceRegistrations.clear();
     }
 
     private synchronized void setConnected(final boolean connected) {
@@ -283,10 +289,10 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         final NetconfDeviceRpc deviceRpc = new NetconfDeviceRpc(schemaContext, listener,
             new NetconfMessageTransformer(emptyContext, false, baseSchema));
 
-        return Futures.transform(deviceRpc.invokeRpc(NetconfMessageTransformUtil.NETCONF_GET_QNAME,
-            Builders.containerBuilder().withNodeIdentifier(NETCONF_GET_NODEID)
-                .withChild(NetconfMessageTransformUtil.toFilterStructure(RFC8528_SCHEMA_MOUNTS, schemaContext))
-                .build()), rpcResult -> processSchemaMounts(rpcResult, emptyContext), MoreExecutors.directExecutor());
+        return Futures.transform(deviceRpc.domRpcService().invokeRpc(Get.QNAME, ImmutableNodes.newContainerBuilder()
+            .withNodeIdentifier(NETCONF_GET_NODEID)
+            .withChild(NetconfMessageTransformUtil.toFilterStructure(RFC8528_SCHEMA_MOUNTS, schemaContext))
+            .build()), rpcResult -> processSchemaMounts(rpcResult, emptyContext), MoreExecutors.directExecutor());
     }
 
     private MountPointContext processSchemaMounts(final DOMRpcResult rpcResult, final MountPointContext emptyContext) {
@@ -305,19 +311,8 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
 
     @Override
     public void onRemoteSessionDown() {
-        setConnected(false);
-        notificationHandler.onRemoteSchemaDown();
-
+        cleanupInitialization();
         salFacade.onDeviceDisconnected();
-        sourceRegistrations.forEach(Registration::close);
-        sourceRegistrations.clear();
-        resetMessageTransformer();
-    }
-
-    @Override
-    public void onRemoteSessionFailed(final Throwable throwable) {
-        setConnected(false);
-        salFacade.onDeviceFailed(throwable);
     }
 
     @Override
@@ -326,12 +321,12 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
     }
 
     private BaseSchema resolveBaseSchema(final boolean notificationSupport) {
-        return notificationSupport ? baseSchemas.getBaseSchemaWithNotifications() : baseSchemas.getBaseSchema();
+        return notificationSupport ? baseSchemas.baseSchemaWithNotifications() : baseSchemas.baseSchema();
     }
 
     protected NetconfDeviceRpc getDeviceSpecificRpc(final MountPointContext result,
             final RemoteDeviceCommunicator listener, final BaseSchema schema) {
-        return new NetconfDeviceRpc(result.getEffectiveModelContext(), listener,
+        return new NetconfDeviceRpc(result.modelContext(), listener,
             new NetconfMessageTransformer(result, true, schema));
     }
 
@@ -419,10 +414,10 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
             if (remoteSessionCapabilities.containsNonModuleCapability(CapabilityURN.NOTIFICATION)) {
                 // FIXME: mutable collection modification!
                 deviceSources.getRequiredSourcesQName().addAll(List.of(
-                    org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714
-                        .$YangModuleInfoImpl.getInstance().getName(),
-                    org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715
-                        .$YangModuleInfoImpl.getInstance().getName())
+                    org.opendaylight.yang.svc.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714
+                        .YangModuleInfoImpl.getInstance().getName(),
+                    org.opendaylight.yang.svc.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715
+                        .YangModuleInfoImpl.getInstance().getName())
                 );
             }
 
@@ -498,7 +493,7 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         private List<SourceIdentifier> filterMissingSources(final Collection<SourceIdentifier> origSources) {
             return origSources.parallelStream().filter(sourceIdentifier -> {
                 try {
-                    schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class).get();
+                    schemaRepository.getSchemaSource(sourceIdentifier, YangTextSource.class).get();
                     return false;
                 } catch (InterruptedException | ExecutionException e) {
                     return true;
@@ -515,7 +510,7 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         private List<SourceIdentifier> handleMissingSchemaSourceException(
                 final MissingSchemaSourceException exception) {
             // In case source missing, try without it
-            final SourceIdentifier missingSource = exception.getSourceId();
+            final SourceIdentifier missingSource = exception.sourceId();
             LOG.warn("{}: Unable to build schema context, missing source {}, will reattempt without it",
                 id, missingSource);
             LOG.debug("{}: Unable to build schema context, missing source {}, will reattempt without it",
@@ -532,16 +527,16 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
             // In case resolution error, try only with resolved sources
             // There are two options why schema resolution exception occurred : unsatisfied imports or flawed model
             // FIXME Do we really have assurance that these two cases cannot happen at once?
-            if (resolutionException.getFailedSource() != null) {
+            final var failedSourceId = resolutionException.sourceId();
+            if (failedSourceId != null) {
                 // flawed model - exclude it
-                final SourceIdentifier failedSourceId = resolutionException.getFailedSource();
                 LOG.warn("{}: Unable to build schema context, failed to resolve source {}, will reattempt without it",
                     id, failedSourceId);
                 LOG.warn("{}: Unable to build schema context, failed to resolve source {}, will reattempt without it",
                     id, failedSourceId, resolutionException);
                 addUnresolvedCapabilities(getQNameFromSourceIdentifiers(List.of(failedSourceId)),
                         UnavailableCapability.FailureReason.UnableToResolve);
-                return stripUnavailableSource(resolutionException.getFailedSource());
+                return stripUnavailableSource(failedSourceId);
             }
             // unsatisfied imports
             addUnresolvedCapabilities(