Bump upstreams
[netconf.git] / plugins / netconf-client-mdsal / src / main / java / org / opendaylight / netconf / client / mdsal / NetconfDevice.java
index b530e75b637edaadbaf722936c0632daf39ae534..1e3ca9148befb53663b3abbe0536b9e0f667ff1f 100644 (file)
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
@@ -52,11 +53,14 @@ import org.opendaylight.netconf.client.mdsal.impl.BaseSchema;
 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil;
 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformer;
 import org.opendaylight.netconf.client.mdsal.spi.NetconfDeviceRpc;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.Get;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscription;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.available.capabilities.AvailableCapability;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.available.capabilities.AvailableCapabilityBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.unavailable.capabilities.UnavailableCapability;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.unavailable.capabilities.UnavailableCapability.FailureReason;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.available.capabilities.AvailableCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.available.capabilities.AvailableCapabilityBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.unavailable.capabilities.UnavailableCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.unavailable.capabilities.UnavailableCapability.FailureReason;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.rfc8528.model.api.SchemaMountConstants;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -64,13 +68,14 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.MountPointContext;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.source.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
-import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
-import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,6 +106,8 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
     private final boolean reconnectOnSchemasChange;
     private final BaseNetconfSchemas baseSchemas;
 
+    @GuardedBy("this")
+    private ListenableFuture<List<Object>> schemaFuturesList;
     @GuardedBy("this")
     private boolean connected = false;
 
@@ -127,8 +134,8 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
     }
 
     @Override
-    public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
-                                  final NetconfDeviceCommunicator listener) {
+    public synchronized void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
+            final NetconfDeviceCommunicator listener) {
         // SchemaContext setup has to be performed in a dedicated thread since we are in a Netty thread in this method
         // YANG models are being downloaded in this method and it would cause a deadlock if we used the netty thread
         // https://netty.io/wiki/thread-model.html
@@ -136,11 +143,10 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
 
         final BaseSchema baseSchema = resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported());
-        final NetconfDeviceRpc initRpc = new NetconfDeviceRpc(baseSchema.getEffectiveModelContext(), listener,
+        final NetconfDeviceRpc initRpc = new NetconfDeviceRpc(baseSchema.modelContext(), listener,
             new NetconfMessageTransformer(baseSchema.getMountPointContext(), false, baseSchema));
-        final ListenableFuture<DeviceSources> sourceResolverFuture = Futures.submit(
-            new DeviceSourcesResolver(id, baseSchema, initRpc, remoteSessionCapabilities, stateSchemasResolver),
-            processingExecutor);
+        final var sourceResolverFuture = Futures.submit(new DeviceSourcesResolver(id, baseSchema, initRpc,
+                remoteSessionCapabilities, stateSchemasResolver), processingExecutor);
 
         if (shouldListenOnSchemaChange(remoteSessionCapabilities)) {
             registerToBaseNetconfStream(initRpc, listener);
@@ -150,13 +156,14 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         final var futureSchema = Futures.transformAsync(sourceResolverFuture,
             deviceSources -> assembleSchemaContext(deviceSources, remoteSessionCapabilities), processingExecutor);
 
-        Futures.addCallback(
-            // Potentially acquire mount point list and interpret it
-            Futures.transformAsync(futureSchema,
-                result -> Futures.transform(createMountPointContext(result.modelContext(), baseSchema, listener),
-                    mount -> new NetconfDeviceSchema(result.capabilities(), mount), processingExecutor),
-                processingExecutor),
-            new FutureCallback<>() {
+        // Potentially acquire mount point list and interpret it
+        final var netconfDeviceSchemaFuture = Futures.transformAsync(futureSchema,
+            result -> Futures.transform(createMountPointContext(result.modelContext(), baseSchema, listener),
+                mount -> new NetconfDeviceSchema(result.capabilities(), mount), processingExecutor),
+            processingExecutor);
+        schemaFuturesList = Futures.allAsList(sourceResolverFuture, futureSchema, netconfDeviceSchemaFuture);
+
+        Futures.addCallback(netconfDeviceSchemaFuture, new FutureCallback<>() {
                 @Override
                 public void onSuccess(final NetconfDeviceSchema result) {
                     handleSalInitializationSuccess(listener, result, remoteSessionCapabilities,
@@ -165,7 +172,13 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
 
                 @Override
                 public void onFailure(final Throwable cause) {
-                    handleSalInitializationFailure(listener, cause);
+                    // The method onRemoteSessionDown was called while the EffectiveModelContext for the device
+                    // was being processed.
+                    if (cause instanceof CancellationException) {
+                        LOG.warn("{}: Device communicator was tear down since the schema setup started", id);
+                    } else {
+                        handleSalInitializationFailure(listener, cause);
+                    }
                 }
             }, MoreExecutors.directExecutor());
     }
@@ -175,9 +188,11 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         // TODO check whether the model describing create subscription is present in schema
         // Perhaps add a default schema context to support create-subscription if the model was not provided
         // (same as what we do for base netconf operations in transformer)
-        final var rpcResultListenableFuture = deviceRpc.invokeRpc(
-                NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME,
-                NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
+        final var rpcResultListenableFuture = deviceRpc.domRpcService()
+            .invokeRpc(CreateSubscription.QNAME, Builders.containerBuilder()
+                .withNodeIdentifier(NodeIdentifier.create(CreateSubscriptionInput.QNAME))
+                // Note: default 'stream' is 'NETCONF', we do not need to create an explicit leaf
+                .build());
 
         Futures.addCallback(rpcResultListenableFuture, new FutureCallback<DOMRpcResult>() {
             @Override
@@ -238,6 +253,11 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
 
     private synchronized void cleanupInitialization() {
         connected = false;
+        if (schemaFuturesList != null && !schemaFuturesList.isDone()) {
+            if (!schemaFuturesList.cancel(true)) {
+                LOG.warn("The cleanup of Schema Futures for device {} was unsuccessful.", id);
+            }
+        }
         notificationHandler.onRemoteSchemaDown();
         sourceRegistrations.forEach(Registration::close);
         sourceRegistrations.clear();
@@ -269,10 +289,10 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         final NetconfDeviceRpc deviceRpc = new NetconfDeviceRpc(schemaContext, listener,
             new NetconfMessageTransformer(emptyContext, false, baseSchema));
 
-        return Futures.transform(deviceRpc.invokeRpc(NetconfMessageTransformUtil.NETCONF_GET_QNAME,
-            Builders.containerBuilder().withNodeIdentifier(NETCONF_GET_NODEID)
-                .withChild(NetconfMessageTransformUtil.toFilterStructure(RFC8528_SCHEMA_MOUNTS, schemaContext))
-                .build()), rpcResult -> processSchemaMounts(rpcResult, emptyContext), MoreExecutors.directExecutor());
+        return Futures.transform(deviceRpc.domRpcService().invokeRpc(Get.QNAME, ImmutableNodes.newContainerBuilder()
+            .withNodeIdentifier(NETCONF_GET_NODEID)
+            .withChild(NetconfMessageTransformUtil.toFilterStructure(RFC8528_SCHEMA_MOUNTS, schemaContext))
+            .build()), rpcResult -> processSchemaMounts(rpcResult, emptyContext), MoreExecutors.directExecutor());
     }
 
     private MountPointContext processSchemaMounts(final DOMRpcResult rpcResult, final MountPointContext emptyContext) {
@@ -306,7 +326,7 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
 
     protected NetconfDeviceRpc getDeviceSpecificRpc(final MountPointContext result,
             final RemoteDeviceCommunicator listener, final BaseSchema schema) {
-        return new NetconfDeviceRpc(result.getEffectiveModelContext(), listener,
+        return new NetconfDeviceRpc(result.modelContext(), listener,
             new NetconfMessageTransformer(result, true, schema));
     }
 
@@ -394,10 +414,10 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
             if (remoteSessionCapabilities.containsNonModuleCapability(CapabilityURN.NOTIFICATION)) {
                 // FIXME: mutable collection modification!
                 deviceSources.getRequiredSourcesQName().addAll(List.of(
-                    org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714
-                        .$YangModuleInfoImpl.getInstance().getName(),
-                    org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715
-                        .$YangModuleInfoImpl.getInstance().getName())
+                    org.opendaylight.yang.svc.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714
+                        .YangModuleInfoImpl.getInstance().getName(),
+                    org.opendaylight.yang.svc.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715
+                        .YangModuleInfoImpl.getInstance().getName())
                 );
             }
 
@@ -473,7 +493,7 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         private List<SourceIdentifier> filterMissingSources(final Collection<SourceIdentifier> origSources) {
             return origSources.parallelStream().filter(sourceIdentifier -> {
                 try {
-                    schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class).get();
+                    schemaRepository.getSchemaSource(sourceIdentifier, YangTextSource.class).get();
                     return false;
                 } catch (InterruptedException | ExecutionException e) {
                     return true;
@@ -490,7 +510,7 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
         private List<SourceIdentifier> handleMissingSchemaSourceException(
                 final MissingSchemaSourceException exception) {
             // In case source missing, try without it
-            final SourceIdentifier missingSource = exception.getSourceId();
+            final SourceIdentifier missingSource = exception.sourceId();
             LOG.warn("{}: Unable to build schema context, missing source {}, will reattempt without it",
                 id, missingSource);
             LOG.debug("{}: Unable to build schema context, missing source {}, will reattempt without it",
@@ -507,16 +527,16 @@ public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
             // In case resolution error, try only with resolved sources
             // There are two options why schema resolution exception occurred : unsatisfied imports or flawed model
             // FIXME Do we really have assurance that these two cases cannot happen at once?
-            if (resolutionException.getFailedSource() != null) {
+            final var failedSourceId = resolutionException.sourceId();
+            if (failedSourceId != null) {
                 // flawed model - exclude it
-                final SourceIdentifier failedSourceId = resolutionException.getFailedSource();
                 LOG.warn("{}: Unable to build schema context, failed to resolve source {}, will reattempt without it",
                     id, failedSourceId);
                 LOG.warn("{}: Unable to build schema context, failed to resolve source {}, will reattempt without it",
                     id, failedSourceId, resolutionException);
                 addUnresolvedCapabilities(getQNameFromSourceIdentifiers(List.of(failedSourceId)),
                         UnavailableCapability.FailureReason.UnableToResolve);
-                return stripUnavailableSource(resolutionException.getFailedSource());
+                return stripUnavailableSource(failedSourceId);
             }
             // unsatisfied imports
             addUnresolvedCapabilities(