Specialize RemoteDeviceHandler to NetconfSessionPreferences
[netconf.git] / netconf / sal-netconf-connector / src / main / java / org / opendaylight / netconf / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.connect.netconf;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12 import static org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil.NETCONF_GET_NODEID;
13
14 import com.google.common.base.Predicates;
15 import com.google.common.collect.Collections2;
16 import com.google.common.collect.Sets;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.ListeningExecutorService;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import io.netty.util.concurrent.EventExecutor;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Objects;
31 import java.util.Optional;
32 import java.util.Set;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.TimeUnit;
35 import java.util.stream.Collectors;
36 import org.checkerframework.checker.lock.qual.GuardedBy;
37 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
38 import org.opendaylight.mdsal.dom.api.DOMRpcService;
39 import org.opendaylight.netconf.api.NetconfMessage;
40 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
41 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
42 import org.opendaylight.netconf.sal.connect.api.MessageTransformer;
43 import org.opendaylight.netconf.sal.connect.api.NetconfDeviceSchemasResolver;
44 import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
45 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
46 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
47 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
48 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
49 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
50 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc;
51 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
52 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseSchema;
53 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
54 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
55 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
56 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev190614.NetconfNodeAugmentedOptional;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapabilityBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
61 import org.opendaylight.yangtools.rfc8528.data.api.MountPointContext;
62 import org.opendaylight.yangtools.rfc8528.data.util.EmptyMountPointContext;
63 import org.opendaylight.yangtools.rfc8528.model.api.SchemaMountConstants;
64 import org.opendaylight.yangtools.yang.common.QName;
65 import org.opendaylight.yangtools.yang.common.RpcError;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
70 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
71 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
72 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
73 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
74 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
75 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
76 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
77 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
78 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
79 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceProvider;
80 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
81 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85 /**
86  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade.
87  */
88 public class NetconfDevice implements RemoteDevice<NetconfDeviceCommunicator> {
89     private static final Logger LOG = LoggerFactory.getLogger(NetconfDevice.class);
90
91     private static final QName RFC8528_SCHEMA_MOUNTS_QNAME = QName.create(
92         SchemaMountConstants.RFC8528_MODULE, "schema-mounts").intern();
93     private static final YangInstanceIdentifier RFC8528_SCHEMA_MOUNTS = YangInstanceIdentifier.create(
94         NodeIdentifier.create(RFC8528_SCHEMA_MOUNTS_QNAME));
95
96     protected final RemoteDeviceId id;
97     protected final EffectiveModelContextFactory schemaContextFactory;
98     protected final SchemaSourceRegistry schemaRegistry;
99     protected final SchemaRepository schemaRepository;
100
101     protected final List<SchemaSourceRegistration<?>> sourceRegistrations = new ArrayList<>();
102
103     private final RemoteDeviceHandler salFacade;
104     private final ListeningExecutorService processingExecutor;
105     private final DeviceActionFactory deviceActionFactory;
106     private final NetconfDeviceSchemasResolver stateSchemasResolver;
107     private final NotificationHandler notificationHandler;
108     private final boolean reconnectOnSchemasChange;
109     private final BaseNetconfSchemas baseSchemas;
110     private final NetconfNode node;
111     private final EventExecutor eventExecutor;
112     private final NetconfNodeAugmentedOptional nodeOptional;
113
114     @GuardedBy("this")
115     private boolean connected = false;
116
117     // Message transformer is constructed once the schemas are available
118     private MessageTransformer messageTransformer;
119
120     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas,
121             final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
122             final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) {
123         this(schemaResourcesDTO, baseSchemas, id, salFacade, globalProcessingExecutor, reconnectOnSchemasChange, null,
124             null, null, null);
125     }
126
127     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas,
128             final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
129             final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange,
130             final DeviceActionFactory deviceActionFactory, final NetconfNode node, final EventExecutor eventExecutor,
131             final NetconfNodeAugmentedOptional nodeOptional) {
132         this.baseSchemas = requireNonNull(baseSchemas);
133         this.id = id;
134         this.reconnectOnSchemasChange = reconnectOnSchemasChange;
135         this.deviceActionFactory = deviceActionFactory;
136         this.node = node;
137         this.eventExecutor = eventExecutor;
138         this.nodeOptional = nodeOptional;
139         schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
140         schemaRepository = schemaResourcesDTO.getSchemaRepository();
141         schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
142         this.salFacade = salFacade;
143         stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver();
144         processingExecutor = requireNonNull(globalProcessingExecutor);
145         notificationHandler = new NotificationHandler(salFacade, id);
146     }
147
148     @Override
149     public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
150                                   final NetconfDeviceCommunicator listener) {
151         // SchemaContext setup has to be performed in a dedicated thread since
152         // we are in a netty thread in this method
153         // Yang models are being downloaded in this method and it would cause a
154         // deadlock if we used the netty thread
155         // http://netty.io/wiki/thread-model.html
156         setConnected(true);
157         LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
158
159         final BaseSchema baseSchema = resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported());
160         final NetconfDeviceRpc initRpc = new NetconfDeviceRpc(baseSchema.getEffectiveModelContext(), listener,
161             new NetconfMessageTransformer(baseSchema.getMountPointContext(), false, baseSchema));
162         final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(
163             new DeviceSourcesResolver(id, baseSchema, initRpc, remoteSessionCapabilities, stateSchemasResolver));
164
165         if (shouldListenOnSchemaChange(remoteSessionCapabilities)) {
166             registerToBaseNetconfStream(initRpc, listener);
167         }
168
169         // Set up the SchemaContext for the device
170         final ListenableFuture<EffectiveModelContext> futureSchema = Futures.transformAsync(sourceResolverFuture,
171             deviceSources -> assembleSchemaContext(deviceSources, remoteSessionCapabilities), processingExecutor);
172
173         // Potentially acquire mount point list and interpret it
174         final ListenableFuture<MountPointContext> futureContext = Futures.transformAsync(futureSchema,
175             schemaContext -> createMountPointContext(schemaContext, baseSchema, listener), processingExecutor);
176
177         Futures.addCallback(futureContext, new FutureCallback<MountPointContext>() {
178             @Override
179             public void onSuccess(final MountPointContext result) {
180                 handleSalInitializationSuccess(result, remoteSessionCapabilities,
181                         getDeviceSpecificRpc(result, listener, baseSchema), listener);
182             }
183
184             @Override
185             public void onFailure(final Throwable cause) {
186                 LOG.warn("{}: Unexpected error resolving device sources", id, cause);
187
188                 // No more sources, fail or try to reconnect
189                 if (cause instanceof EmptySchemaContextException) {
190                     if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
191                         eventExecutor.schedule(() -> {
192                             LOG.warn("Reconnection is allowed! This can lead to unexpected errors at runtime.");
193                             LOG.warn("{} : No more sources for schema context.", id);
194                             LOG.info("{} : Try to remount device.", id);
195                             onRemoteSessionDown();
196                             salFacade.onDeviceReconnected(remoteSessionCapabilities, node);
197                         }, nodeOptional.getIgnoreMissingSchemaSources().getReconnectTime().toJava(),
198                             TimeUnit.MILLISECONDS);
199                         return;
200                     }
201                 }
202
203                 handleSalInitializationFailure(cause, listener);
204                 salFacade.onDeviceFailed(cause);
205             }
206         }, MoreExecutors.directExecutor());
207     }
208
209     private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc,
210                                              final NetconfDeviceCommunicator listener) {
211         // TODO check whether the model describing create subscription is present in schema
212         // Perhaps add a default schema context to support create-subscription if the model was not provided
213         // (same as what we do for base netconf operations in transformer)
214         final ListenableFuture<DOMRpcResult> rpcResultListenableFuture = deviceRpc.invokeRpc(
215                 NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME,
216                 NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
217
218         Futures.addCallback(rpcResultListenableFuture, new FutureCallback<DOMRpcResult>() {
219             @Override
220             public void onSuccess(final DOMRpcResult domRpcResult) {
221                 notificationHandler.addNotificationFilter(notification -> {
222                     if (NetconfCapabilityChange.QNAME.equals(notification.getBody().getIdentifier().getNodeType())) {
223                         LOG.info("{}: Schemas change detected, reconnecting", id);
224                         // Only disconnect is enough,
225                         // the reconnecting nature of the connector will take care of reconnecting
226                         listener.disconnect();
227                         return Optional.empty();
228                     }
229                     return Optional.of(notification);
230                 });
231             }
232
233             @Override
234             public void onFailure(final Throwable throwable) {
235                 LOG.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly",
236                         throwable);
237             }
238         }, MoreExecutors.directExecutor());
239     }
240
241     private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) {
242         return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
243     }
244
245     private synchronized void handleSalInitializationSuccess(final MountPointContext result,
246             final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc,
247             final RemoteDeviceCommunicator listener) {
248         //NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing,
249         //since salFacade.onDeviceDisconnected was already called.
250         if (connected) {
251             messageTransformer = new NetconfMessageTransformer(result, true,
252                 resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported()));
253
254             // salFacade.onDeviceConnected has to be called before the notification handler is initialized
255             salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc,
256                     deviceActionFactory == null ? null : deviceActionFactory.createDeviceAction(
257                             messageTransformer, listener, result.getEffectiveModelContext()));
258             notificationHandler.onRemoteSchemaUp(messageTransformer);
259
260             LOG.info("{}: Netconf connector initialized successfully", id);
261         } else {
262             LOG.warn("{}: Device communicator was closed before schema setup finished.", id);
263         }
264     }
265
266     private void handleSalInitializationFailure(final Throwable throwable, final RemoteDeviceCommunicator listener) {
267         LOG.error("{}: Initialization in sal failed, disconnecting from device", id, throwable);
268         listener.close();
269         onRemoteSessionDown();
270         resetMessageTransformer();
271     }
272
273     /**
274      * Set the transformer to null as is in initial state.
275      */
276     private void resetMessageTransformer() {
277         updateTransformer(null);
278     }
279
280     private synchronized void updateTransformer(final MessageTransformer transformer) {
281         messageTransformer = transformer;
282     }
283
284     private synchronized void setConnected(final boolean connected) {
285         this.connected = connected;
286     }
287
288     private ListenableFuture<EffectiveModelContext> assembleSchemaContext(final DeviceSources deviceSources,
289             final NetconfSessionPreferences remoteSessionCapabilities) {
290         LOG.debug("{}: Resolved device sources to {}", id, deviceSources);
291         final SchemaSourceProvider<YangTextSchemaSource> yangProvider = deviceSources.getSourceProvider();
292         for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) {
293             sourceRegistrations.add(schemaRegistry.registerSchemaSource(yangProvider,
294                 PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class,
295                     PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
296         }
297
298         return new SchemaSetup(deviceSources, remoteSessionCapabilities).startResolution();
299     }
300
301     private ListenableFuture<MountPointContext> createMountPointContext(final EffectiveModelContext schemaContext,
302             final BaseSchema baseSchema, final NetconfDeviceCommunicator listener) {
303         final MountPointContext emptyContext = new EmptyMountPointContext(schemaContext);
304         if (schemaContext.findModule(SchemaMountConstants.RFC8528_MODULE).isEmpty()) {
305             return Futures.immediateFuture(emptyContext);
306         }
307
308         // Create a temporary RPC invoker and acquire the mount point tree
309         LOG.debug("{}: Acquiring available mount points", id);
310         final NetconfDeviceRpc deviceRpc = new NetconfDeviceRpc(schemaContext, listener,
311             new NetconfMessageTransformer(emptyContext, false, baseSchema));
312
313         return Futures.transform(deviceRpc.invokeRpc(NetconfMessageTransformUtil.NETCONF_GET_QNAME,
314             Builders.containerBuilder().withNodeIdentifier(NETCONF_GET_NODEID)
315                 .withChild(NetconfMessageTransformUtil.toFilterStructure(RFC8528_SCHEMA_MOUNTS, schemaContext))
316                 .build()), rpcResult -> processSchemaMounts(rpcResult, emptyContext), MoreExecutors.directExecutor());
317     }
318
319     private MountPointContext processSchemaMounts(final DOMRpcResult rpcResult, final MountPointContext emptyContext) {
320         final Collection<? extends RpcError> errors = rpcResult.getErrors();
321         if (!errors.isEmpty()) {
322             LOG.warn("{}: Schema-mounts acquisition resulted in errors {}", id, errors);
323         }
324         final NormalizedNode schemaMounts = rpcResult.getResult();
325         if (schemaMounts == null) {
326             LOG.debug("{}: device does not define any schema mounts", id);
327             return emptyContext;
328         }
329         if (!(schemaMounts instanceof ContainerNode)) {
330             LOG.warn("{}: ignoring non-container schema mounts {}", id, schemaMounts);
331             return emptyContext;
332         }
333
334         return DeviceMountPointContext.create(emptyContext, (ContainerNode) schemaMounts);
335     }
336
337     @Override
338     public void onRemoteSessionDown() {
339         setConnected(false);
340         notificationHandler.onRemoteSchemaDown();
341
342         salFacade.onDeviceDisconnected();
343         sourceRegistrations.forEach(SchemaSourceRegistration::close);
344         sourceRegistrations.clear();
345         resetMessageTransformer();
346     }
347
348     @Override
349     public void onRemoteSessionFailed(final Throwable throwable) {
350         setConnected(false);
351         salFacade.onDeviceFailed(throwable);
352     }
353
354     @Override
355     public void onNotification(final NetconfMessage notification) {
356         notificationHandler.handleNotification(notification);
357     }
358
359     private BaseSchema resolveBaseSchema(final boolean notificationSupport) {
360         return notificationSupport ? baseSchemas.getBaseSchemaWithNotifications() : baseSchemas.getBaseSchema();
361     }
362
363     protected NetconfDeviceRpc getDeviceSpecificRpc(final MountPointContext result,
364             final RemoteDeviceCommunicator listener, final BaseSchema schema) {
365         return new NetconfDeviceRpc(result.getEffectiveModelContext(), listener,
366             new NetconfMessageTransformer(result, true, schema));
367     }
368
369     /**
370      * Just a transfer object containing schema related dependencies. Injected in constructor.
371      */
372     public static class SchemaResourcesDTO {
373         private final SchemaSourceRegistry schemaRegistry;
374         private final SchemaRepository schemaRepository;
375         private final EffectiveModelContextFactory schemaContextFactory;
376         private final NetconfDeviceSchemasResolver stateSchemasResolver;
377
378         public SchemaResourcesDTO(final SchemaSourceRegistry schemaRegistry,
379                                   final SchemaRepository schemaRepository,
380                                   final EffectiveModelContextFactory schemaContextFactory,
381                                   final NetconfDeviceSchemasResolver deviceSchemasResolver) {
382             this.schemaRegistry = requireNonNull(schemaRegistry);
383             this.schemaRepository = requireNonNull(schemaRepository);
384             this.schemaContextFactory = requireNonNull(schemaContextFactory);
385             stateSchemasResolver = requireNonNull(deviceSchemasResolver);
386         }
387
388         public SchemaSourceRegistry getSchemaRegistry() {
389             return schemaRegistry;
390         }
391
392         public SchemaRepository getSchemaRepository() {
393             return schemaRepository;
394         }
395
396         public EffectiveModelContextFactory getSchemaContextFactory() {
397             return schemaContextFactory;
398         }
399
400         public NetconfDeviceSchemasResolver getStateSchemasResolver() {
401             return stateSchemasResolver;
402         }
403     }
404
405     /**
406      * A dedicated exception to indicate when we fail to setup a SchemaContext.
407      *
408      * @author Robert Varga
409      */
410     private static final class EmptySchemaContextException extends Exception {
411         private static final long serialVersionUID = 1L;
412
413         EmptySchemaContextException(final String message) {
414             super(message);
415         }
416     }
417
418     /**
419      * Schema builder that tries to build schema context from provided sources or biggest subset of it.
420      */
421     private final class SchemaSetup implements FutureCallback<EffectiveModelContext> {
422         private final SettableFuture<EffectiveModelContext> resultFuture = SettableFuture.create();
423
424         private final DeviceSources deviceSources;
425         private final NetconfSessionPreferences remoteSessionCapabilities;
426         private final NetconfDeviceCapabilities capabilities;
427
428         private Collection<SourceIdentifier> requiredSources;
429
430         SchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities) {
431             this.deviceSources = deviceSources;
432             this.remoteSessionCapabilities = remoteSessionCapabilities;
433             capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
434
435             // If device supports notifications and does not contain necessary modules, add them automatically
436             if (remoteSessionCapabilities.containsNonModuleCapability(
437                     XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_NOTIFICATION_1_0)) {
438                 deviceSources.getRequiredSourcesQName().addAll(
439                         Arrays.asList(
440                                 org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714
441                                         .$YangModuleInfoImpl.getInstance().getName(),
442                                 org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715
443                                         .$YangModuleInfoImpl.getInstance().getName()
444                         )
445                 );
446             }
447
448             requiredSources = deviceSources.getRequiredSources();
449             final Collection<SourceIdentifier> missingSources = filterMissingSources(requiredSources);
450
451             capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(missingSources),
452                     UnavailableCapability.FailureReason.MissingSource);
453             requiredSources.removeAll(missingSources);
454         }
455
456         ListenableFuture<EffectiveModelContext> startResolution() {
457             trySetupSchema();
458             return resultFuture;
459         }
460
461         @Override
462         public void onSuccess(final EffectiveModelContext result) {
463             LOG.debug("{}: Schema context built successfully from {}", id, requiredSources);
464
465             final Collection<QName> filteredQNames = Sets.difference(deviceSources.getRequiredSourcesQName(),
466                     capabilities.getUnresolvedCapabilites().keySet());
467             capabilities.addCapabilities(filteredQNames.stream().map(entry -> new AvailableCapabilityBuilder()
468                     .setCapability(entry.toString()).setCapabilityOrigin(
469                             remoteSessionCapabilities.getModuleBasedCapsOrigin().get(entry)).build())
470                     .collect(Collectors.toList()));
471
472             capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities
473                     .getNonModuleCaps().stream().map(entry -> new AvailableCapabilityBuilder()
474                             .setCapability(entry).setCapabilityOrigin(
475                                     remoteSessionCapabilities.getNonModuleBasedCapsOrigin().get(entry)).build())
476                     .collect(Collectors.toList()));
477
478             resultFuture.set(result);
479         }
480
481         @Override
482         public void onFailure(final Throwable cause) {
483             // schemaBuilderFuture.checkedGet() throws only SchemaResolutionException
484             // that might be wrapping a MissingSchemaSourceException so we need to look
485             // at the cause of the exception to make sure we don't misinterpret it.
486             if (cause instanceof MissingSchemaSourceException) {
487                 requiredSources = handleMissingSchemaSourceException((MissingSchemaSourceException) cause);
488             } else if (cause instanceof SchemaResolutionException) {
489                 requiredSources = handleSchemaResolutionException((SchemaResolutionException) cause);
490             } else {
491                 LOG.debug("Unhandled failure", cause);
492                 resultFuture.setException(cause);
493                 // No more trying...
494                 return;
495             }
496
497             trySetupSchema();
498         }
499
500         private void trySetupSchema() {
501             if (!requiredSources.isEmpty()) {
502                 // Initiate async resolution, drive it back based on the result
503                 LOG.trace("{}: Trying to build schema context from {}", id, requiredSources);
504                 Futures.addCallback(schemaContextFactory.createEffectiveModelContext(requiredSources), this,
505                     MoreExecutors.directExecutor());
506             } else {
507                 LOG.debug("{}: no more sources for schema context", id);
508                 resultFuture.setException(new EmptySchemaContextException(id + ": No more sources for schema context"));
509             }
510         }
511
512         private Collection<SourceIdentifier> filterMissingSources(final Collection<SourceIdentifier> origSources) {
513             return origSources.parallelStream().filter(sourceIdentifier -> {
514                 try {
515                     schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class).get();
516                     return false;
517                 } catch (InterruptedException | ExecutionException e) {
518                     return true;
519                 }
520             }).collect(Collectors.toList());
521         }
522
523         private Collection<SourceIdentifier> handleMissingSchemaSourceException(
524                 final MissingSchemaSourceException exception) {
525             // In case source missing, try without it
526             final SourceIdentifier missingSource = exception.getSourceId();
527             LOG.warn("{}: Unable to build schema context, missing source {}, will reattempt without it",
528                 id, missingSource);
529             LOG.debug("{}: Unable to build schema context, missing source {}, will reattempt without it",
530                 id, missingSource, exception);
531             final Collection<QName> qNameOfMissingSource =
532                 getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource));
533             if (!qNameOfMissingSource.isEmpty()) {
534                 capabilities.addUnresolvedCapabilities(
535                         qNameOfMissingSource, UnavailableCapability.FailureReason.MissingSource);
536             }
537             return stripUnavailableSource(missingSource);
538         }
539
540         private Collection<SourceIdentifier> handleSchemaResolutionException(
541                 final SchemaResolutionException resolutionException) {
542             // In case resolution error, try only with resolved sources
543             // There are two options why schema resolution exception occurred : unsatisfied imports or flawed model
544             // FIXME Do we really have assurance that these two cases cannot happen at once?
545             if (resolutionException.getFailedSource() != null) {
546                 // flawed model - exclude it
547                 final SourceIdentifier failedSourceId = resolutionException.getFailedSource();
548                 LOG.warn("{}: Unable to build schema context, failed to resolve source {}, will reattempt without it",
549                     id, failedSourceId);
550                 LOG.warn("{}: Unable to build schema context, failed to resolve source {}, will reattempt without it",
551                     id, failedSourceId, resolutionException);
552                 capabilities.addUnresolvedCapabilities(
553                         getQNameFromSourceIdentifiers(Collections.singleton(failedSourceId)),
554                         UnavailableCapability.FailureReason.UnableToResolve);
555                 return stripUnavailableSource(resolutionException.getFailedSource());
556             }
557             // unsatisfied imports
558             final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
559             capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources),
560                 UnavailableCapability.FailureReason.UnableToResolve);
561             LOG.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only",
562                 id, resolutionException.getUnsatisfiedImports());
563             LOG.debug("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only",
564                 id, resolutionException.getUnsatisfiedImports(), resolutionException);
565             return resolutionException.getResolvedSources();
566         }
567
568         private Collection<SourceIdentifier> stripUnavailableSource(final SourceIdentifier sourceIdToRemove) {
569             final LinkedList<SourceIdentifier> sourceIdentifiers = new LinkedList<>(requiredSources);
570             checkState(sourceIdentifiers.remove(sourceIdToRemove),
571                     "%s: Trying to remove %s from %s failed", id, sourceIdToRemove, requiredSources);
572             return sourceIdentifiers;
573         }
574
575         private Collection<QName> getQNameFromSourceIdentifiers(final Collection<SourceIdentifier> identifiers) {
576             final Collection<QName> qNames = Collections2.transform(identifiers, this::getQNameFromSourceIdentifier);
577
578             if (qNames.isEmpty()) {
579                 LOG.debug("{}: Unable to map any source identifiers to a capability reported by device : {}", id,
580                         identifiers);
581             }
582             return Collections2.filter(qNames, Predicates.notNull());
583         }
584
585         private QName getQNameFromSourceIdentifier(final SourceIdentifier identifier) {
586             // Required sources are all required and provided merged in DeviceSourcesResolver
587             for (final QName qname : deviceSources.getRequiredSourcesQName()) {
588                 if (!qname.getLocalName().equals(identifier.name().getLocalName())) {
589                     continue;
590                 }
591
592                 if (Objects.equals(identifier.revision(), qname.getRevision().orElse(null))) {
593                     return qname;
594                 }
595             }
596             LOG.warn("Unable to map identifier to a devices reported capability: {} Available: {}",identifier,
597                     deviceSources.getRequiredSourcesQName());
598             // return null since we cannot find the QName,
599             // this capability will be removed from required sources and not reported as unresolved-capability
600             return null;
601         }
602     }
603 }