Move DeviceSources registration
[netconf.git] / netconf / sal-netconf-connector / src / main / java / org / opendaylight / netconf / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.connect.netconf;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_NODEID;
13
14 import com.google.common.base.Predicates;
15 import com.google.common.collect.Collections2;
16 import com.google.common.collect.Sets;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.ListeningExecutorService;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import io.netty.util.concurrent.EventExecutor;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Objects;
30 import java.util.Optional;
31 import java.util.Set;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.stream.Collectors;
35 import org.checkerframework.checker.lock.qual.GuardedBy;
36 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
37 import org.opendaylight.mdsal.dom.api.DOMRpcService;
38 import org.opendaylight.netconf.api.NetconfMessage;
39 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
40 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
41 import org.opendaylight.netconf.sal.connect.api.MessageTransformer;
42 import org.opendaylight.netconf.sal.connect.api.NetconfDeviceSchemasResolver;
43 import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
44 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
45 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
46 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
47 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
48 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
49 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc;
50 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
51 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseSchema;
52 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
53 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
54 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
55 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev190614.NetconfNodeAugmentedOptional;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapabilityBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
60 import org.opendaylight.yangtools.concepts.Registration;
61 import org.opendaylight.yangtools.rfc8528.data.api.MountPointContext;
62 import org.opendaylight.yangtools.rfc8528.data.util.EmptyMountPointContext;
63 import org.opendaylight.yangtools.rfc8528.model.api.SchemaMountConstants;
64 import org.opendaylight.yangtools.yang.common.QName;
65 import org.opendaylight.yangtools.yang.common.RpcError;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
70 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
71 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
72 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
73 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
74 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
75 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
76 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
77 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
78 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 /**
83  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade.
84  */
85 public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
86     private static final Logger LOG = LoggerFactory.getLogger(NetconfDevice.class);
87
88     private static final QName RFC8528_SCHEMA_MOUNTS_QNAME = QName.create(
89         SchemaMountConstants.RFC8528_MODULE, "schema-mounts").intern();
90     private static final YangInstanceIdentifier RFC8528_SCHEMA_MOUNTS = YangInstanceIdentifier.create(
91         NodeIdentifier.create(RFC8528_SCHEMA_MOUNTS_QNAME));
92
93     protected final RemoteDeviceId id;
94     protected final EffectiveModelContextFactory schemaContextFactory;
95     protected final SchemaSourceRegistry schemaRegistry;
96     protected final SchemaRepository schemaRepository;
97
98     protected final List<Registration> sourceRegistrations = new ArrayList<>();
99
100     private final RemoteDeviceHandler salFacade;
101     private final ListeningExecutorService processingExecutor;
102     private final DeviceActionFactory deviceActionFactory;
103     private final NetconfDeviceSchemasResolver stateSchemasResolver;
104     private final NotificationHandler notificationHandler;
105     private final boolean reconnectOnSchemasChange;
106     private final BaseNetconfSchemas baseSchemas;
107     private final NetconfNode node;
108     private final EventExecutor eventExecutor;
109     private final NetconfNodeAugmentedOptional nodeOptional;
110
111     @GuardedBy("this")
112     private boolean connected = false;
113
114     // Message transformer is constructed once the schemas are available
115     private MessageTransformer messageTransformer;
116
117     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas,
118             final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
119             final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) {
120         this(schemaResourcesDTO, baseSchemas, id, salFacade, globalProcessingExecutor, reconnectOnSchemasChange, null,
121             null, null, null);
122     }
123
124     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas,
125             final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
126             final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange,
127             final DeviceActionFactory deviceActionFactory, final NetconfNode node, final EventExecutor eventExecutor,
128             final NetconfNodeAugmentedOptional nodeOptional) {
129         this.baseSchemas = requireNonNull(baseSchemas);
130         this.id = id;
131         this.reconnectOnSchemasChange = reconnectOnSchemasChange;
132         this.deviceActionFactory = deviceActionFactory;
133         this.node = node;
134         this.eventExecutor = eventExecutor;
135         this.nodeOptional = nodeOptional;
136         schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
137         schemaRepository = schemaResourcesDTO.getSchemaRepository();
138         schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
139         this.salFacade = salFacade;
140         stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver();
141         processingExecutor = requireNonNull(globalProcessingExecutor);
142         notificationHandler = new NotificationHandler(salFacade, id);
143     }
144
145     @Override
146     public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
147                                   final NetconfDeviceCommunicator listener) {
148         // SchemaContext setup has to be performed in a dedicated thread since
149         // we are in a netty thread in this method
150         // Yang models are being downloaded in this method and it would cause a
151         // deadlock if we used the netty thread
152         // http://netty.io/wiki/thread-model.html
153         setConnected(true);
154         LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
155
156         final BaseSchema baseSchema = resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported());
157         final NetconfDeviceRpc initRpc = new NetconfDeviceRpc(baseSchema.getEffectiveModelContext(), listener,
158             new NetconfMessageTransformer(baseSchema.getMountPointContext(), false, baseSchema));
159         final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(
160             new DeviceSourcesResolver(id, baseSchema, initRpc, remoteSessionCapabilities, stateSchemasResolver));
161
162         if (shouldListenOnSchemaChange(remoteSessionCapabilities)) {
163             registerToBaseNetconfStream(initRpc, listener);
164         }
165
166         // Set up the SchemaContext for the device
167         final ListenableFuture<EffectiveModelContext> futureSchema = Futures.transformAsync(sourceResolverFuture,
168             deviceSources -> assembleSchemaContext(deviceSources, remoteSessionCapabilities), processingExecutor);
169
170         // Potentially acquire mount point list and interpret it
171         final ListenableFuture<MountPointContext> futureContext = Futures.transformAsync(futureSchema,
172             schemaContext -> createMountPointContext(schemaContext, baseSchema, listener), processingExecutor);
173
174         Futures.addCallback(futureContext, new FutureCallback<MountPointContext>() {
175             @Override
176             public void onSuccess(final MountPointContext result) {
177                 handleSalInitializationSuccess(result, remoteSessionCapabilities,
178                         getDeviceSpecificRpc(result, listener, baseSchema), listener);
179             }
180
181             @Override
182             public void onFailure(final Throwable cause) {
183                 LOG.warn("{}: Unexpected error resolving device sources", id, cause);
184
185                 // No more sources, fail or try to reconnect
186                 if (cause instanceof EmptySchemaContextException) {
187                     if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
188                         eventExecutor.schedule(() -> {
189                             LOG.warn("Reconnection is allowed! This can lead to unexpected errors at runtime.");
190                             LOG.warn("{} : No more sources for schema context.", id);
191                             LOG.info("{} : Try to remount device.", id);
192                             onRemoteSessionDown();
193                             salFacade.onDeviceReconnected(remoteSessionCapabilities, node);
194                         }, nodeOptional.getIgnoreMissingSchemaSources().getReconnectTime().toJava(),
195                             TimeUnit.MILLISECONDS);
196                         return;
197                     }
198                 }
199
200                 handleSalInitializationFailure(cause, listener);
201                 salFacade.onDeviceFailed(cause);
202             }
203         }, MoreExecutors.directExecutor());
204     }
205
206     private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc,
207                                              final NetconfDeviceCommunicator listener) {
208         // TODO check whether the model describing create subscription is present in schema
209         // Perhaps add a default schema context to support create-subscription if the model was not provided
210         // (same as what we do for base netconf operations in transformer)
211         final ListenableFuture<DOMRpcResult> rpcResultListenableFuture = deviceRpc.invokeRpc(
212                 NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME,
213                 NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
214
215         Futures.addCallback(rpcResultListenableFuture, new FutureCallback<DOMRpcResult>() {
216             @Override
217             public void onSuccess(final DOMRpcResult domRpcResult) {
218                 notificationHandler.addNotificationFilter(notification -> {
219                     if (NetconfCapabilityChange.QNAME.equals(notification.getBody().getIdentifier().getNodeType())) {
220                         LOG.info("{}: Schemas change detected, reconnecting", id);
221                         // Only disconnect is enough,
222                         // the reconnecting nature of the connector will take care of reconnecting
223                         listener.disconnect();
224                         return Optional.empty();
225                     }
226                     return Optional.of(notification);
227                 });
228             }
229
230             @Override
231             public void onFailure(final Throwable throwable) {
232                 LOG.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly",
233                         throwable);
234             }
235         }, MoreExecutors.directExecutor());
236     }
237
238     private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) {
239         return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
240     }
241
242     private synchronized void handleSalInitializationSuccess(final MountPointContext result,
243             final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc,
244             final RemoteDeviceCommunicator listener) {
245         //NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing,
246         //since salFacade.onDeviceDisconnected was already called.
247         if (connected) {
248             messageTransformer = new NetconfMessageTransformer(result, true,
249                 resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported()));
250
251             // salFacade.onDeviceConnected has to be called before the notification handler is initialized
252             salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc,
253                     deviceActionFactory == null ? null : deviceActionFactory.createDeviceAction(
254                             messageTransformer, listener, result.getEffectiveModelContext()));
255             notificationHandler.onRemoteSchemaUp(messageTransformer);
256
257             LOG.info("{}: Netconf connector initialized successfully", id);
258         } else {
259             LOG.warn("{}: Device communicator was closed before schema setup finished.", id);
260         }
261     }
262
263     private void handleSalInitializationFailure(final Throwable throwable, final RemoteDeviceCommunicator listener) {
264         LOG.error("{}: Initialization in sal failed, disconnecting from device", id, throwable);
265         listener.close();
266         onRemoteSessionDown();
267         resetMessageTransformer();
268     }
269
270     /**
271      * Set the transformer to null as is in initial state.
272      */
273     private void resetMessageTransformer() {
274         updateTransformer(null);
275     }
276
277     private synchronized void updateTransformer(final MessageTransformer transformer) {
278         messageTransformer = transformer;
279     }
280
281     private synchronized void setConnected(final boolean connected) {
282         this.connected = connected;
283     }
284
285     private ListenableFuture<EffectiveModelContext> assembleSchemaContext(final DeviceSources deviceSources,
286             final NetconfSessionPreferences remoteSessionCapabilities) {
287         LOG.debug("{}: Resolved device sources to {}", id, deviceSources);
288
289         sourceRegistrations.addAll(deviceSources.register(schemaRegistry));
290
291         return new SchemaSetup(deviceSources, remoteSessionCapabilities).startResolution();
292     }
293
294     private ListenableFuture<MountPointContext> createMountPointContext(final EffectiveModelContext schemaContext,
295             final BaseSchema baseSchema, final NetconfDeviceCommunicator listener) {
296         final MountPointContext emptyContext = new EmptyMountPointContext(schemaContext);
297         if (schemaContext.findModule(SchemaMountConstants.RFC8528_MODULE).isEmpty()) {
298             return Futures.immediateFuture(emptyContext);
299         }
300
301         // Create a temporary RPC invoker and acquire the mount point tree
302         LOG.debug("{}: Acquiring available mount points", id);
303         final NetconfDeviceRpc deviceRpc = new NetconfDeviceRpc(schemaContext, listener,
304             new NetconfMessageTransformer(emptyContext, false, baseSchema));
305
306         return Futures.transform(deviceRpc.invokeRpc(NetconfMessageTransformUtil.NETCONF_GET_QNAME,
307             Builders.containerBuilder().withNodeIdentifier(NETCONF_GET_NODEID)
308                 .withChild(NetconfMessageTransformUtil.toFilterStructure(RFC8528_SCHEMA_MOUNTS, schemaContext))
309                 .build()), rpcResult -> processSchemaMounts(rpcResult, emptyContext), MoreExecutors.directExecutor());
310     }
311
312     private MountPointContext processSchemaMounts(final DOMRpcResult rpcResult, final MountPointContext emptyContext) {
313         final Collection<? extends RpcError> errors = rpcResult.getErrors();
314         if (!errors.isEmpty()) {
315             LOG.warn("{}: Schema-mounts acquisition resulted in errors {}", id, errors);
316         }
317         final NormalizedNode schemaMounts = rpcResult.getResult();
318         if (schemaMounts == null) {
319             LOG.debug("{}: device does not define any schema mounts", id);
320             return emptyContext;
321         }
322         if (!(schemaMounts instanceof ContainerNode)) {
323             LOG.warn("{}: ignoring non-container schema mounts {}", id, schemaMounts);
324             return emptyContext;
325         }
326
327         return DeviceMountPointContext.create(emptyContext, (ContainerNode) schemaMounts);
328     }
329
330     @Override
331     public void onRemoteSessionDown() {
332         setConnected(false);
333         notificationHandler.onRemoteSchemaDown();
334
335         salFacade.onDeviceDisconnected();
336         sourceRegistrations.forEach(Registration::close);
337         sourceRegistrations.clear();
338         resetMessageTransformer();
339     }
340
341     @Override
342     public void onRemoteSessionFailed(final Throwable throwable) {
343         setConnected(false);
344         salFacade.onDeviceFailed(throwable);
345     }
346
347     @Override
348     public void onNotification(final NetconfMessage notification) {
349         notificationHandler.handleNotification(notification);
350     }
351
352     private BaseSchema resolveBaseSchema(final boolean notificationSupport) {
353         return notificationSupport ? baseSchemas.getBaseSchemaWithNotifications() : baseSchemas.getBaseSchema();
354     }
355
356     protected NetconfDeviceRpc getDeviceSpecificRpc(final MountPointContext result,
357             final RemoteDeviceCommunicator listener, final BaseSchema schema) {
358         return new NetconfDeviceRpc(result.getEffectiveModelContext(), listener,
359             new NetconfMessageTransformer(result, true, schema));
360     }
361
362     /**
363      * Just a transfer object containing schema related dependencies. Injected in constructor.
364      */
365     public static class SchemaResourcesDTO {
366         private final SchemaSourceRegistry schemaRegistry;
367         private final SchemaRepository schemaRepository;
368         private final EffectiveModelContextFactory schemaContextFactory;
369         private final NetconfDeviceSchemasResolver stateSchemasResolver;
370
371         public SchemaResourcesDTO(final SchemaSourceRegistry schemaRegistry,
372                                   final SchemaRepository schemaRepository,
373                                   final EffectiveModelContextFactory schemaContextFactory,
374                                   final NetconfDeviceSchemasResolver deviceSchemasResolver) {
375             this.schemaRegistry = requireNonNull(schemaRegistry);
376             this.schemaRepository = requireNonNull(schemaRepository);
377             this.schemaContextFactory = requireNonNull(schemaContextFactory);
378             stateSchemasResolver = requireNonNull(deviceSchemasResolver);
379         }
380
381         public SchemaSourceRegistry getSchemaRegistry() {
382             return schemaRegistry;
383         }
384
385         public SchemaRepository getSchemaRepository() {
386             return schemaRepository;
387         }
388
389         public EffectiveModelContextFactory getSchemaContextFactory() {
390             return schemaContextFactory;
391         }
392
393         public NetconfDeviceSchemasResolver getStateSchemasResolver() {
394             return stateSchemasResolver;
395         }
396     }
397
398     /**
399      * A dedicated exception to indicate when we fail to setup a SchemaContext.
400      *
401      * @author Robert Varga
402      */
403     private static final class EmptySchemaContextException extends Exception {
404         private static final long serialVersionUID = 1L;
405
406         EmptySchemaContextException(final String message) {
407             super(message);
408         }
409     }
410
411     /**
412      * Schema builder that tries to build schema context from provided sources or biggest subset of it.
413      */
414     private final class SchemaSetup implements FutureCallback<EffectiveModelContext> {
415         private final SettableFuture<EffectiveModelContext> resultFuture = SettableFuture.create();
416
417         private final DeviceSources deviceSources;
418         private final NetconfSessionPreferences remoteSessionCapabilities;
419         private final NetconfDeviceCapabilities capabilities;
420
421         private Collection<SourceIdentifier> requiredSources;
422
423         SchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities) {
424             this.deviceSources = deviceSources;
425             this.remoteSessionCapabilities = remoteSessionCapabilities;
426             capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
427
428             // If device supports notifications and does not contain necessary modules, add them automatically
429             if (remoteSessionCapabilities.containsNonModuleCapability(
430                     XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_NOTIFICATION_1_0)) {
431                 deviceSources.getRequiredSourcesQName().addAll(
432                         Arrays.asList(
433                                 org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714
434                                         .$YangModuleInfoImpl.getInstance().getName(),
435                                 org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715
436                                         .$YangModuleInfoImpl.getInstance().getName()
437                         )
438                 );
439             }
440
441             requiredSources = deviceSources.getRequiredSources();
442             final Collection<SourceIdentifier> missingSources = filterMissingSources(requiredSources);
443
444             capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(missingSources),
445                     UnavailableCapability.FailureReason.MissingSource);
446             requiredSources.removeAll(missingSources);
447         }
448
449         ListenableFuture<EffectiveModelContext> startResolution() {
450             trySetupSchema();
451             return resultFuture;
452         }
453
454         @Override
455         public void onSuccess(final EffectiveModelContext result) {
456             LOG.debug("{}: Schema context built successfully from {}", id, requiredSources);
457
458             final Collection<QName> filteredQNames = Sets.difference(deviceSources.getRequiredSourcesQName(),
459                     capabilities.getUnresolvedCapabilites().keySet());
460             capabilities.addCapabilities(filteredQNames.stream().map(entry -> new AvailableCapabilityBuilder()
461                     .setCapability(entry.toString()).setCapabilityOrigin(
462                             remoteSessionCapabilities.getModuleBasedCapsOrigin().get(entry)).build())
463                     .collect(Collectors.toList()));
464
465             capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities
466                     .getNonModuleCaps().stream().map(entry -> new AvailableCapabilityBuilder()
467                             .setCapability(entry).setCapabilityOrigin(
468                                     remoteSessionCapabilities.getNonModuleBasedCapsOrigin().get(entry)).build())
469                     .collect(Collectors.toList()));
470
471             resultFuture.set(result);
472         }
473
474         @Override
475         public void onFailure(final Throwable cause) {
476             // schemaBuilderFuture.checkedGet() throws only SchemaResolutionException
477             // that might be wrapping a MissingSchemaSourceException so we need to look
478             // at the cause of the exception to make sure we don't misinterpret it.
479             if (cause instanceof MissingSchemaSourceException) {
480                 requiredSources = handleMissingSchemaSourceException((MissingSchemaSourceException) cause);
481             } else if (cause instanceof SchemaResolutionException) {
482                 requiredSources = handleSchemaResolutionException((SchemaResolutionException) cause);
483             } else {
484                 LOG.debug("Unhandled failure", cause);
485                 resultFuture.setException(cause);
486                 // No more trying...
487                 return;
488             }
489
490             trySetupSchema();
491         }
492
493         private void trySetupSchema() {
494             if (!requiredSources.isEmpty()) {
495                 // Initiate async resolution, drive it back based on the result
496                 LOG.trace("{}: Trying to build schema context from {}", id, requiredSources);
497                 Futures.addCallback(schemaContextFactory.createEffectiveModelContext(requiredSources), this,
498                     MoreExecutors.directExecutor());
499             } else {
500                 LOG.debug("{}: no more sources for schema context", id);
501                 resultFuture.setException(new EmptySchemaContextException(id + ": No more sources for schema context"));
502             }
503         }
504
505         private List<SourceIdentifier> filterMissingSources(final Collection<SourceIdentifier> origSources) {
506             return origSources.parallelStream().filter(sourceIdentifier -> {
507                 try {
508                     schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class).get();
509                     return false;
510                 } catch (InterruptedException | ExecutionException e) {
511                     return true;
512                 }
513             }).collect(Collectors.toList());
514         }
515
516         private List<SourceIdentifier> handleMissingSchemaSourceException(
517                 final MissingSchemaSourceException exception) {
518             // In case source missing, try without it
519             final SourceIdentifier missingSource = exception.getSourceId();
520             LOG.warn("{}: Unable to build schema context, missing source {}, will reattempt without it",
521                 id, missingSource);
522             LOG.debug("{}: Unable to build schema context, missing source {}, will reattempt without it",
523                 id, missingSource, exception);
524             final Collection<QName> qNameOfMissingSource =
525                 getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource));
526             if (!qNameOfMissingSource.isEmpty()) {
527                 capabilities.addUnresolvedCapabilities(
528                         qNameOfMissingSource, UnavailableCapability.FailureReason.MissingSource);
529             }
530             return stripUnavailableSource(missingSource);
531         }
532
533         private Collection<SourceIdentifier> handleSchemaResolutionException(
534                 final SchemaResolutionException resolutionException) {
535             // In case resolution error, try only with resolved sources
536             // There are two options why schema resolution exception occurred : unsatisfied imports or flawed model
537             // FIXME Do we really have assurance that these two cases cannot happen at once?
538             if (resolutionException.getFailedSource() != null) {
539                 // flawed model - exclude it
540                 final SourceIdentifier failedSourceId = resolutionException.getFailedSource();
541                 LOG.warn("{}: Unable to build schema context, failed to resolve source {}, will reattempt without it",
542                     id, failedSourceId);
543                 LOG.warn("{}: Unable to build schema context, failed to resolve source {}, will reattempt without it",
544                     id, failedSourceId, resolutionException);
545                 capabilities.addUnresolvedCapabilities(
546                         getQNameFromSourceIdentifiers(Collections.singleton(failedSourceId)),
547                         UnavailableCapability.FailureReason.UnableToResolve);
548                 return stripUnavailableSource(resolutionException.getFailedSource());
549             }
550             // unsatisfied imports
551             final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
552             capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources),
553                 UnavailableCapability.FailureReason.UnableToResolve);
554             LOG.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only",
555                 id, resolutionException.getUnsatisfiedImports());
556             LOG.debug("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only",
557                 id, resolutionException.getUnsatisfiedImports(), resolutionException);
558             return resolutionException.getResolvedSources();
559         }
560
561         private List<SourceIdentifier> stripUnavailableSource(final SourceIdentifier sourceIdToRemove) {
562             final var tmp = new ArrayList<>(requiredSources);
563             checkState(tmp.remove(sourceIdToRemove), "%s: Trying to remove %s from %s failed", id, sourceIdToRemove,
564                 requiredSources);
565             return tmp;
566         }
567
568         private Collection<QName> getQNameFromSourceIdentifiers(final Collection<SourceIdentifier> identifiers) {
569             final Collection<QName> qNames = Collections2.transform(identifiers, this::getQNameFromSourceIdentifier);
570
571             if (qNames.isEmpty()) {
572                 LOG.debug("{}: Unable to map any source identifiers to a capability reported by device : {}", id,
573                         identifiers);
574             }
575             return Collections2.filter(qNames, Predicates.notNull());
576         }
577
578         private QName getQNameFromSourceIdentifier(final SourceIdentifier identifier) {
579             // Required sources are all required and provided merged in DeviceSourcesResolver
580             for (final QName qname : deviceSources.getRequiredSourcesQName()) {
581                 if (!qname.getLocalName().equals(identifier.name().getLocalName())) {
582                     continue;
583                 }
584
585                 if (Objects.equals(identifier.revision(), qname.getRevision().orElse(null))) {
586                     return qname;
587                 }
588             }
589             LOG.warn("Unable to map identifier to a devices reported capability: {} Available: {}",identifier,
590                     deviceSources.getRequiredSourcesQName());
591             // return null since we cannot find the QName,
592             // this capability will be removed from required sources and not reported as unresolved-capability
593             return null;
594         }
595     }
596 }