Untangle NetconfDevice setup 45/84745/4
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 26 Sep 2019 12:08:40 +0000 (14:08 +0200)
committerRobert Varga <nite@hq.sk>
Sat, 5 Oct 2019 14:01:49 +0000 (14:01 +0000)
NetconfDevice sets up the device connection in multiple stages,
which is all done in a messy way. Untangle the setup procedure,
so that we can properly hook up into it.

Change-Id: I612683153c58d0b319793440cbb06bfad7846e52
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java

index 88e5af62b14395d9bdda6146d880f640029f3ec4..8a16ed177e4bad7ab76cc5968c3fee6f6020f64b 100644 (file)
@@ -12,13 +12,13 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.netty.util.concurrent.EventExecutor;
 import java.util.ArrayList;
@@ -153,25 +153,48 @@ public class NetconfDevice
             registerToBaseNetconfStream(initRpc, listener);
         }
 
-        final FutureCallback<DeviceSources> resolvedSourceCallback = new FutureCallback<DeviceSources>() {
-            @Override
-            public void onSuccess(final DeviceSources result) {
-                addProvidedSourcesToSchemaRegistry(result);
-                setUpSchema(result);
-            }
+        // Set up the SchemaContext for the device
+        final ListenableFuture<SchemaContext> futureSchema = Futures.transformAsync(sourceResolverFuture, schemas -> {
+            LOG.debug("{}: Resolved device sources to {}", id, schemas);
+            addProvidedSourcesToSchemaRegistry(schemas);
+            return new SchemaSetup(schemas, remoteSessionCapabilities).startResolution();
+        }, processingExecutor);
 
-            private void setUpSchema(final DeviceSources result) {
-                processingExecutor.submit(new SchemaSetup(result, remoteSessionCapabilities, listener));
-            }
+        // Potentially acquire mount point list and interpret it
+        final ListenableFuture<MountPointContext> futureContext = Futures.transform(futureSchema, schemaContext -> {
+            // FIXME: check if there is RFC8528 schema available
+            return new EmptyMountPointContext(schemaContext);
+        }, processingExecutor);
 
+        Futures.addCallback(futureContext, new FutureCallback<MountPointContext>() {
             @Override
-            public void onFailure(final Throwable throwable) {
-                LOG.warn("{}: Unexpected error resolving device sources", id, throwable);
-                handleSalInitializationFailure(throwable, listener);
+            public void onSuccess(final MountPointContext result) {
+                handleSalInitializationSuccess(result, remoteSessionCapabilities,
+                    getDeviceSpecificRpc(result, listener), listener);
             }
-        };
 
-        Futures.addCallback(sourceResolverFuture, resolvedSourceCallback, MoreExecutors.directExecutor());
+            @Override
+            public void onFailure(final Throwable cause) {
+                LOG.warn("{}: Unexpected error resolving device sources", id, cause);
+
+                // No more sources, fail or try to reconnect
+                if (cause instanceof EmptySchemaContextException) {
+                    if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().isAllowed()) {
+                        eventExecutor.schedule(() -> {
+                            LOG.warn("Reconnection is allowed! This can lead to unexpected errors at runtime.");
+                            LOG.warn("{} : No more sources for schema context.", id);
+                            LOG.info("{} : Try to remount device.", id);
+                            onRemoteSessionDown();
+                            salFacade.onDeviceReconnected(remoteSessionCapabilities, node);
+                        }, nodeOptional.getIgnoreMissingSchemaSources().getReconnectTime(), TimeUnit.MILLISECONDS);
+                        return;
+                    }
+                }
+
+                handleSalInitializationFailure(cause, listener);
+                salFacade.onDeviceFailed(cause);
+            }
+        }, MoreExecutors.directExecutor());
     }
 
     private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc,
@@ -292,6 +315,11 @@ public class NetconfDevice
         return notificationSupport ? BaseSchema.BASE_NETCONF_CTX_WITH_NOTIFICATIONS : BaseSchema.BASE_NETCONF_CTX;
     }
 
+    protected NetconfDeviceRpc getDeviceSpecificRpc(final MountPointContext result,
+            final RemoteDeviceCommunicator<NetconfMessage> listener) {
+        return new NetconfDeviceRpc(result.getSchemaContext(), listener, new NetconfMessageTransformer(result, true));
+    }
+
     /**
      * Just a transfer object containing schema related dependencies. Injected in constructor.
      */
@@ -328,38 +356,102 @@ public class NetconfDevice
         }
     }
 
+    /**
+     * A dedicated exception to indicate when we fail to setup a SchemaContext.
+     *
+     * @author Robert Varga
+     */
+    private static final class EmptySchemaContextException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        EmptySchemaContextException(final String message) {
+            super(message);
+        }
+    }
+
     /**
      * Schema builder that tries to build schema context from provided sources or biggest subset of it.
      */
-    private final class SchemaSetup implements Runnable {
+    private final class SchemaSetup implements FutureCallback<SchemaContext> {
+        private final SettableFuture<SchemaContext> resultFuture = SettableFuture.create();
+
         private final DeviceSources deviceSources;
         private final NetconfSessionPreferences remoteSessionCapabilities;
-        private final RemoteDeviceCommunicator<NetconfMessage> listener;
         private final NetconfDeviceCapabilities capabilities;
 
-        SchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities,
-                           final RemoteDeviceCommunicator<NetconfMessage> listener) {
+        private Collection<SourceIdentifier> requiredSources;
+
+        SchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities) {
             this.deviceSources = deviceSources;
             this.remoteSessionCapabilities = remoteSessionCapabilities;
-            this.listener = listener;
             this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
-        }
-
-        @Override
-        public void run() {
 
-            final Collection<SourceIdentifier> requiredSources = deviceSources.getRequiredSources();
+            requiredSources = deviceSources.getRequiredSources();
             final Collection<SourceIdentifier> missingSources = filterMissingSources(requiredSources);
 
             capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(missingSources),
                     UnavailableCapability.FailureReason.MissingSource);
-
             requiredSources.removeAll(missingSources);
-            setUpSchema(requiredSources);
         }
 
-        private Collection<SourceIdentifier> filterMissingSources(final Collection<SourceIdentifier> requiredSources) {
-            return requiredSources.parallelStream().filter(sourceIdentifier -> {
+        ListenableFuture<SchemaContext> startResolution() {
+            trySetupSchema();
+            return resultFuture;
+        }
+
+        @Override
+        public void onSuccess(final SchemaContext result) {
+            LOG.debug("{}: Schema context built successfully from {}", id, requiredSources);
+
+            final Collection<QName> filteredQNames = Sets.difference(deviceSources.getRequiredSourcesQName(),
+                    capabilities.getUnresolvedCapabilites().keySet());
+            capabilities.addCapabilities(filteredQNames.stream().map(entry -> new AvailableCapabilityBuilder()
+                    .setCapability(entry.toString()).setCapabilityOrigin(
+                            remoteSessionCapabilities.getModuleBasedCapsOrigin().get(entry)).build())
+                    .collect(Collectors.toList()));
+
+            capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities
+                    .getNonModuleCaps().stream().map(entry -> new AvailableCapabilityBuilder()
+                            .setCapability(entry).setCapabilityOrigin(
+                                    remoteSessionCapabilities.getNonModuleBasedCapsOrigin().get(entry)).build())
+                    .collect(Collectors.toList()));
+
+            resultFuture.set(result);
+        }
+
+        @Override
+        public void onFailure(final Throwable cause) {
+            // schemaBuilderFuture.checkedGet() throws only SchemaResolutionException
+            // that might be wrapping a MissingSchemaSourceException so we need to look
+            // at the cause of the exception to make sure we don't misinterpret it.
+            if (cause instanceof MissingSchemaSourceException) {
+                requiredSources = handleMissingSchemaSourceException((MissingSchemaSourceException) cause);
+            } else if (cause instanceof SchemaResolutionException) {
+                requiredSources = handleSchemaResolutionException((SchemaResolutionException) cause);
+            } else {
+                LOG.debug("Unhandled failure", cause);
+                resultFuture.setException(cause);
+                // No more trying...
+                return;
+            }
+
+            trySetupSchema();
+        }
+
+        private void trySetupSchema() {
+            if (!requiredSources.isEmpty()) {
+                // Initiate async resolution, drive it back based on the result
+                LOG.trace("{}: Trying to build schema context from {}", id, requiredSources);
+                Futures.addCallback(schemaContextFactory.createSchemaContext(requiredSources), this,
+                    MoreExecutors.directExecutor());
+            } else {
+                LOG.debug("{}: no more sources for schema context", id);
+                resultFuture.setException(new EmptySchemaContextException(id + ": No more sources for schema context"));
+            }
+        }
+
+        private Collection<SourceIdentifier> filterMissingSources(final Collection<SourceIdentifier> origSources) {
+            return origSources.parallelStream().filter(sourceIdentifier -> {
                 try {
                     schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class).get();
                     return false;
@@ -369,80 +461,8 @@ public class NetconfDevice
             }).collect(Collectors.toList());
         }
 
-        /**
-         * Build schema context, in case of success or final failure notify device.
-         *
-         * @param requiredSources required sources
-         */
-        @SuppressWarnings("checkstyle:IllegalCatch")
-        private void setUpSchema(Collection<SourceIdentifier> requiredSources) {
-            while (!requiredSources.isEmpty()) {
-                LOG.trace("{}: Trying to build schema context from {}", id, requiredSources);
-                try {
-                    final ListenableFuture<SchemaContext> schemaBuilderFuture = schemaContextFactory
-                            .createSchemaContext(requiredSources);
-                    final SchemaContext result = schemaBuilderFuture.get();
-                    LOG.debug("{}: Schema context built successfully from {}", id, requiredSources);
-                    final Collection<QName> filteredQNames = Sets.difference(deviceSources.getRequiredSourcesQName(),
-                            capabilities.getUnresolvedCapabilites().keySet());
-                    capabilities.addCapabilities(filteredQNames.stream().map(entry -> new AvailableCapabilityBuilder()
-                            .setCapability(entry.toString()).setCapabilityOrigin(
-                                    remoteSessionCapabilities.getModuleBasedCapsOrigin().get(entry)).build())
-                            .collect(Collectors.toList()));
-
-                    capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities
-                            .getNonModuleCaps().stream().map(entry -> new AvailableCapabilityBuilder()
-                                    .setCapability(entry).setCapabilityOrigin(
-                                            remoteSessionCapabilities.getNonModuleBasedCapsOrigin().get(entry)).build())
-                            .collect(Collectors.toList()));
-
-                    final MountPointContext mountContext = new EmptyMountPointContext(result);
-                    handleSalInitializationSuccess(mountContext, remoteSessionCapabilities,
-                        getDeviceSpecificRpc(mountContext), listener);
-                    return;
-                } catch (final ExecutionException e) {
-                    // schemaBuilderFuture.checkedGet() throws only SchemaResolutionException
-                    // that might be wrapping a MissingSchemaSourceException so we need to look
-                    // at the cause of the exception to make sure we don't misinterpret it.
-                    final Throwable cause = e.getCause();
-
-                    if (cause instanceof MissingSchemaSourceException) {
-                        requiredSources = handleMissingSchemaSourceException(
-                                requiredSources, (MissingSchemaSourceException) cause);
-                        continue;
-                    }
-                    if (cause instanceof SchemaResolutionException) {
-                        requiredSources = handleSchemaResolutionException(requiredSources,
-                            (SchemaResolutionException) cause);
-                    } else {
-                        handleSalInitializationFailure(e, listener);
-                        return;
-                    }
-                } catch (final Exception e) {
-                    // unknown error, fail
-                    handleSalInitializationFailure(e, listener);
-                    return;
-                }
-            }
-            // No more sources, fail or try to reconnect
-            if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().isAllowed()) {
-                eventExecutor.schedule(() -> {
-                    LOG.warn("Reconnection is allowed! This can lead to unexpected errors at runtime.");
-                    LOG.warn("{} : No more sources for schema context.", id);
-                    LOG.info("{} : Try to remount device.", id);
-                    onRemoteSessionDown();
-                    salFacade.onDeviceReconnected(remoteSessionCapabilities, node);
-                }, nodeOptional.getIgnoreMissingSchemaSources().getReconnectTime(), TimeUnit.MILLISECONDS);
-            } else {
-                final IllegalStateException cause =
-                        new IllegalStateException(id + ": No more sources for schema context");
-                handleSalInitializationFailure(cause, listener);
-                salFacade.onDeviceFailed(cause);
-            }
-        }
-
         private Collection<SourceIdentifier> handleMissingSchemaSourceException(
-                final Collection<SourceIdentifier> requiredSources, final MissingSchemaSourceException exception) {
+                final MissingSchemaSourceException exception) {
             // In case source missing, try without it
             final SourceIdentifier missingSource = exception.getSourceId();
             LOG.warn("{}: Unable to build schema context, missing source {}, will reattempt without it",
@@ -455,11 +475,11 @@ public class NetconfDevice
                 capabilities.addUnresolvedCapabilities(
                         qNameOfMissingSource, UnavailableCapability.FailureReason.MissingSource);
             }
-            return stripUnavailableSource(requiredSources, missingSource);
+            return stripUnavailableSource(missingSource);
         }
 
         private Collection<SourceIdentifier> handleSchemaResolutionException(
-            final Collection<SourceIdentifier> requiredSources, final SchemaResolutionException resolutionException) {
+                final SchemaResolutionException resolutionException) {
             // In case resolution error, try only with resolved sources
             // There are two options why schema resolution exception occurred : unsatisfied imports or flawed model
             // FIXME Do we really have assurance that these two cases cannot happen at once?
@@ -473,7 +493,7 @@ public class NetconfDevice
                 capabilities.addUnresolvedCapabilities(
                         getQNameFromSourceIdentifiers(Collections.singleton(failedSourceId)),
                         UnavailableCapability.FailureReason.UnableToResolve);
-                return stripUnavailableSource(requiredSources, resolutionException.getFailedSource());
+                return stripUnavailableSource(resolutionException.getFailedSource());
             }
             // unsatisfied imports
             final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
@@ -486,14 +506,8 @@ public class NetconfDevice
             return resolutionException.getResolvedSources();
         }
 
-        protected NetconfDeviceRpc getDeviceSpecificRpc(final MountPointContext result) {
-            return new NetconfDeviceRpc(result.getSchemaContext(), listener,
-                new NetconfMessageTransformer(result, true));
-        }
-
-        private Collection<SourceIdentifier> stripUnavailableSource(final Collection<SourceIdentifier> requiredSources,
-                                                                    final SourceIdentifier sourceIdToRemove) {
-            final LinkedList<SourceIdentifier> sourceIdentifiers = Lists.newLinkedList(requiredSources);
+        private Collection<SourceIdentifier> stripUnavailableSource(final SourceIdentifier sourceIdToRemove) {
+            final LinkedList<SourceIdentifier> sourceIdentifiers = new LinkedList<>(requiredSources);
             checkState(sourceIdentifiers.remove(sourceIdToRemove),
                     "%s: Trying to remove %s from %s failed", id, sourceIdToRemove, requiredSources);
             return sourceIdentifiers;