CDS: Add stress test RPC to the cars model
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Collections2;
15 import com.google.common.collect.Lists;
16 import com.google.common.collect.Sets;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.ListeningExecutorService;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import java.util.Collection;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutorService;
29 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
30 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.controller.netconf.api.NetconfMessage;
34 import org.opendaylight.controller.sal.connect.api.MessageTransformer;
35 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
36 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
37 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
38 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
39 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
40 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
41 import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
42 import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider;
43 import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
44 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
45 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.$YangModuleInfoImpl;
47 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability;
49 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
50 import org.opendaylight.yangtools.yang.common.QName;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
52 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
53 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
54 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
55 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
56 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
57 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
58 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
59 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
60 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
66  */
67 public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
68
69     private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
70
71     /**
72      * Initial schema context contains schemas for netconf monitoring and netconf notifications
73      */
74     public static final SchemaContext INIT_SCHEMA_CTX;
75
76     static {
77         try {
78             final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
79             moduleInfoBackedContext.addModuleInfos(
80                     Lists.newArrayList(
81                             $YangModuleInfoImpl.getInstance(),
82                             org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.$YangModuleInfoImpl.getInstance(),
83                             org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.$YangModuleInfoImpl.getInstance()));
84             INIT_SCHEMA_CTX = moduleInfoBackedContext.tryToCreateSchemaContext().get();
85         } catch (final RuntimeException e) {
86             logger.error("Unable to prepare schema context for netconf initialization", e);
87             throw new ExceptionInInitializerError(e);
88         }
89     }
90
91     public static final Function<QName, SourceIdentifier> QNAME_TO_SOURCE_ID_FUNCTION = new Function<QName, SourceIdentifier>() {
92         @Override
93         public SourceIdentifier apply(final QName input) {
94             return new SourceIdentifier(input.getLocalName(), Optional.fromNullable(input.getFormattedRevision()));
95         }
96     };
97
98     private final RemoteDeviceId id;
99     private final boolean reconnectOnSchemasChange;
100
101     private final SchemaContextFactory schemaContextFactory;
102     private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
103     private final ListeningExecutorService processingExecutor;
104     private final SchemaSourceRegistry schemaRegistry;
105     private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
106     private final NotificationHandler notificationHandler;
107     private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
108
109     // Message transformer is constructed once the schemas are available
110     private MessageTransformer<NetconfMessage> messageTransformer;
111
112     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
113                          final ExecutorService globalProcessingExecutor) {
114         this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, false);
115     }
116
117     /**
118      * Create rpc implementation capable of handling RPC for monitoring and notifications even before the schemas of remote device are downloaded
119      */
120     static NetconfDeviceRpc getRpcForInitialization(final NetconfDeviceCommunicator listener) {
121         return new NetconfDeviceRpc(INIT_SCHEMA_CTX, listener, new NetconfMessageTransformer(INIT_SCHEMA_CTX, false));
122     }
123
124
125     // FIXME reduce parameters
126     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
127                          final ExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) {
128         this.id = id;
129         this.reconnectOnSchemasChange = reconnectOnSchemasChange;
130         this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
131         this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
132         this.salFacade = salFacade;
133         this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver();
134         this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor);
135         this.notificationHandler = new NotificationHandler(salFacade, id);
136     }
137
138     @Override
139     public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
140                                   final NetconfDeviceCommunicator listener) {
141         // SchemaContext setup has to be performed in a dedicated thread since
142         // we are in a netty thread in this method
143         // Yang models are being downloaded in this method and it would cause a
144         // deadlock if we used the netty thread
145         // http://netty.io/wiki/thread-model.html
146         logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
147
148         final NetconfDeviceRpc initRpc = getRpcForInitialization(listener);
149         final DeviceSourcesResolver task = new DeviceSourcesResolver(remoteSessionCapabilities, id, stateSchemasResolver, initRpc);
150         final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(task);
151
152         if(shouldListenOnSchemaChange(remoteSessionCapabilities)) {
153            registerToBaseNetconfStream(initRpc, listener);
154         }
155
156         final FutureCallback<DeviceSources> resolvedSourceCallback = new FutureCallback<DeviceSources>() {
157             @Override
158             public void onSuccess(final DeviceSources result) {
159                 addProvidedSourcesToSchemaRegistry(initRpc, result);
160                 setUpSchema(result);
161             }
162
163             private void setUpSchema(final DeviceSources result) {
164                 processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, listener));
165             }
166
167             @Override
168             public void onFailure(final Throwable t) {
169                 logger.warn("{}: Unexpected error resolving device sources: {}", id, t);
170                 handleSalInitializationFailure(t, listener);
171             }
172         };
173
174         Futures.addCallback(sourceResolverFuture, resolvedSourceCallback);
175     }
176
177     private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) {
178        // TODO check whether the model describing create subscription is present in schema
179         // Perhaps add a default schema context to support create-subscription if the model was not provided (same as what we do for base netconf operations in transformer)
180        final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResultListenableFuture =
181                 deviceRpc.invokeRpc(NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME), NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT);
182
183         final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() {
184             @Override
185             public Optional<DOMNotification> filterNotification(final DOMNotification notification) {
186                 if (isCapabilityChanged(notification)) {
187                     logger.info("{}: Schemas change detected, reconnecting", id);
188                     // Only disconnect is enough, the reconnecting nature of the connector will take care of reconnecting
189                     listener.disconnect();
190                     return Optional.absent();
191                 }
192                 return Optional.of(notification);
193             }
194
195             private boolean isCapabilityChanged(final DOMNotification notification) {
196                 return notification.getBody().getNodeType().equals(NetconfCapabilityChange.QNAME);
197             }
198         };
199
200         Futures.addCallback(rpcResultListenableFuture, new FutureCallback<DOMRpcResult>() {
201             @Override
202             public void onSuccess(final DOMRpcResult domRpcResult) {
203                 notificationHandler.addNotificationFilter(filter);
204             }
205
206             @Override
207             public void onFailure(final Throwable t) {
208                 logger.warn("Unable to subscribe to base notification stream. Schemas will not be reloaded on the fly", t);
209             }
210         });
211     }
212
213     private boolean shouldListenOnSchemaChange(final NetconfSessionPreferences remoteSessionCapabilities) {
214         return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
215     }
216
217     @VisibleForTesting
218     void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
219         messageTransformer = new NetconfMessageTransformer(result, true);
220
221         updateTransformer(messageTransformer);
222         // salFacade.onDeviceConnected has to be called before the notification handler is initialized
223         salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
224         notificationHandler.onRemoteSchemaUp(messageTransformer);
225
226         logger.info("{}: Netconf connector initialized successfully", id);
227     }
228
229     private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
230         logger.error("{}: Initialization in sal failed, disconnecting from device", id, t);
231         listener.close();
232         onRemoteSessionDown();
233         resetMessageTransformer();
234     }
235
236     /**
237      * Set the transformer to null as is in initial state
238      */
239     private void resetMessageTransformer() {
240         updateTransformer(null);
241     }
242
243     private void updateTransformer(final MessageTransformer<NetconfMessage> transformer) {
244         messageTransformer = transformer;
245     }
246
247     private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) {
248         final NetconfRemoteSchemaYangSourceProvider yangProvider = new NetconfRemoteSchemaYangSourceProvider(id, deviceRpc);
249         for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) {
250             sourceRegistrations.add(schemaRegistry.registerSchemaSource(yangProvider,
251                     PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
252         }
253     }
254
255     @Override
256     public void onRemoteSessionDown() {
257         notificationHandler.onRemoteSchemaDown();
258
259         salFacade.onDeviceDisconnected();
260         for (final SchemaSourceRegistration<? extends SchemaSourceRepresentation> sourceRegistration : sourceRegistrations) {
261             sourceRegistration.close();
262         }
263         resetMessageTransformer();
264     }
265
266     @Override
267     public void onRemoteSessionFailed(final Throwable throwable) {
268         salFacade.onDeviceFailed(throwable);
269     }
270
271     @Override
272     public void onNotification(final NetconfMessage notification) {
273         notificationHandler.handleNotification(notification);
274     }
275
276     /**
277      * Just a transfer object containing schema related dependencies. Injected in constructor.
278      */
279     public static class SchemaResourcesDTO {
280         private final SchemaSourceRegistry schemaRegistry;
281         private final SchemaContextFactory schemaContextFactory;
282         private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
283
284         public SchemaResourcesDTO(final SchemaSourceRegistry schemaRegistry, final SchemaContextFactory schemaContextFactory, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
285             this.schemaRegistry = Preconditions.checkNotNull(schemaRegistry);
286             this.schemaContextFactory = Preconditions.checkNotNull(schemaContextFactory);
287             this.stateSchemasResolver = Preconditions.checkNotNull(stateSchemasResolver);
288         }
289
290         public SchemaSourceRegistry getSchemaRegistry() {
291             return schemaRegistry;
292         }
293
294         public SchemaContextFactory getSchemaContextFactory() {
295             return schemaContextFactory;
296         }
297
298         public NetconfStateSchemas.NetconfStateSchemasResolver getStateSchemasResolver() {
299             return stateSchemasResolver;
300         }
301     }
302
303     /**
304      * Schema building callable.
305      */
306     private static class DeviceSourcesResolver implements Callable<DeviceSources> {
307
308         private final NetconfDeviceRpc deviceRpc;
309         private final NetconfSessionPreferences remoteSessionCapabilities;
310         private final RemoteDeviceId id;
311         private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
312
313         DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities,
314                                      final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
315             this.deviceRpc = deviceRpc;
316             this.remoteSessionCapabilities = remoteSessionCapabilities;
317             this.id = id;
318             this.stateSchemasResolver = stateSchemasResolver;
319         }
320
321         public DeviceSourcesResolver(final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver, final NetconfDeviceRpc rpcForMonitoring) {
322             this(rpcForMonitoring, remoteSessionCapabilities, id, stateSchemasResolver);
323         }
324
325         @Override
326         public DeviceSources call() throws Exception {
327             final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
328             logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
329
330             final Set<QName> requiredSources = Sets.newHashSet(remoteSessionCapabilities.getModuleBasedCaps());
331             final Set<QName> providedSources = availableSchemas.getAvailableYangSchemasQNames();
332
333             final Set<QName> requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources);
334             if (!requiredSourcesNotProvided.isEmpty()) {
335                 logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}",
336                         id, requiredSourcesNotProvided);
337                 logger.warn("{}: Attempting to build schema context from required sources", id);
338             }
339
340             // Here all the sources reported in netconf monitoring are merged with those reported in hello.
341             // It is necessary to perform this since submodules are not mentioned in hello but still required.
342             // This clashes with the option of a user to specify supported yang models manually in configuration for netconf-connector
343             // and as a result one is not able to fully override yang models of a device. It is only possible to add additional models.
344             final Set<QName> providedSourcesNotRequired = Sets.difference(providedSources, requiredSources);
345             if (!providedSourcesNotRequired.isEmpty()) {
346                 logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}",
347                         id, providedSourcesNotRequired);
348                 logger.warn("{}: Adding provided but not required sources as required to prevent failures", id);
349                 logger.debug("{}: Netconf device reported in hello: {}", id, requiredSources);
350                 requiredSources.addAll(providedSourcesNotRequired);
351             }
352
353             return new DeviceSources(requiredSources, providedSources);
354         }
355     }
356
357     /**
358      * Contains RequiredSources - sources from capabilities.
359      */
360     private static final class DeviceSources {
361         private final Set<QName> requiredSources;
362         private final Set<QName> providedSources;
363
364         public DeviceSources(final Set<QName> requiredSources, final Set<QName> providedSources) {
365             this.requiredSources = requiredSources;
366             this.providedSources = providedSources;
367         }
368
369         public Set<QName> getRequiredSourcesQName() {
370             return requiredSources;
371         }
372
373         public Set<QName> getProvidedSourcesQName() {
374             return providedSources;
375         }
376
377         public Collection<SourceIdentifier> getRequiredSources() {
378             return Collections2.transform(requiredSources, QNAME_TO_SOURCE_ID_FUNCTION);
379         }
380
381         public Collection<SourceIdentifier> getProvidedSources() {
382             return Collections2.transform(providedSources, QNAME_TO_SOURCE_ID_FUNCTION);
383         }
384
385     }
386
387     /**
388      * Schema builder that tries to build schema context from provided sources or biggest subset of it.
389      */
390     private final class RecursiveSchemaSetup implements Runnable {
391         private final DeviceSources deviceSources;
392         private final NetconfSessionPreferences remoteSessionCapabilities;
393         private final RemoteDeviceCommunicator<NetconfMessage> listener;
394         private final NetconfDeviceCapabilities capabilities;
395
396         public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceCommunicator<NetconfMessage> listener) {
397             this.deviceSources = deviceSources;
398             this.remoteSessionCapabilities = remoteSessionCapabilities;
399             this.listener = listener;
400             this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
401         }
402
403         @Override
404         public void run() {
405             setUpSchema(deviceSources.getRequiredSources());
406         }
407
408         /**
409          * Recursively build schema context, in case of success or final failure notify device
410          */
411         // FIXME reimplement without recursion
412         private void setUpSchema(final Collection<SourceIdentifier> requiredSources) {
413             logger.trace("{}: Trying to build schema context from {}", id, requiredSources);
414
415             // If no more sources, fail
416             if(requiredSources.isEmpty()) {
417                 final IllegalStateException cause = new IllegalStateException(id + ": No more sources for schema context");
418                 handleSalInitializationFailure(cause, listener);
419                 salFacade.onDeviceFailed(cause);
420                 return;
421             }
422
423             final CheckedFuture<SchemaContext, SchemaResolutionException> schemaBuilderFuture = schemaContextFactory.createSchemaContext(requiredSources);
424
425             final FutureCallback<SchemaContext> RecursiveSchemaBuilderCallback = new FutureCallback<SchemaContext>() {
426
427                 @Override
428                 public void onSuccess(final SchemaContext result) {
429                     logger.debug("{}: Schema context built successfully from {}", id, requiredSources);
430                     final Collection<QName> filteredQNames = Sets.difference(deviceSources.getProvidedSourcesQName(), capabilities.getUnresolvedCapabilites().keySet());
431                     capabilities.addCapabilities(filteredQNames);
432                     capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps());
433                     handleSalInitializationSuccess(result, remoteSessionCapabilities, getDeviceSpecificRpc(result));
434                 }
435
436                 @Override
437                 public void onFailure(final Throwable t) {
438                     // In case source missing, try without it
439                     if (t instanceof MissingSchemaSourceException) {
440                         final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId();
441                         logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource);
442                         capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), UnavailableCapability.FailureReason.MissingSource);
443                         setUpSchema(stripMissingSource(requiredSources, missingSource));
444
445                     // In case resolution error, try only with resolved sources
446                     } else if (t instanceof SchemaResolutionException) {
447                         // TODO check for infinite loop
448                         final SchemaResolutionException resolutionException = (SchemaResolutionException) t;
449                         final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
450                         capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), UnavailableCapability.FailureReason.UnableToResolve);
451                         logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports());
452                         setUpSchema(resolutionException.getResolvedSources());
453                     // unknown error, fail
454                     } else {
455                         handleSalInitializationFailure(t, listener);
456                     }
457                 }
458             };
459
460             Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback);
461         }
462
463         private NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
464             return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
465         }
466
467         private Collection<SourceIdentifier> stripMissingSource(final Collection<SourceIdentifier> requiredSources, final SourceIdentifier sIdToRemove) {
468             final LinkedList<SourceIdentifier> sourceIdentifiers = Lists.newLinkedList(requiredSources);
469             final boolean removed = sourceIdentifiers.remove(sIdToRemove);
470             Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources);
471             return sourceIdentifiers;
472         }
473
474         private Collection<QName> getQNameFromSourceIdentifiers(final Collection<SourceIdentifier> identifiers) {
475             final Collection<QName> qNames = Collections2.transform(identifiers, new Function<SourceIdentifier, QName>() {
476                 @Override
477                 public QName apply(final SourceIdentifier sourceIdentifier) {
478                     return getQNameFromSourceIdentifier(sourceIdentifier);
479                 }
480             });
481
482             if (qNames.isEmpty()) {
483                 logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers);
484             }
485             return qNames;
486         }
487
488         private QName getQNameFromSourceIdentifier(final SourceIdentifier identifier) {
489             // Required sources are all required and provided merged in DeviceSourcesResolver
490             for (final QName qname : deviceSources.getRequiredSourcesQName()) {
491                 if(qname.getLocalName().equals(identifier.getName()) == false) {
492                     continue;
493                 }
494
495                 if(identifier.getRevision().equals(SourceIdentifier.NOT_PRESENT_FORMATTED_REVISION) &&
496                         qname.getRevision() == null) {
497                     return qname;
498                 }
499
500                 if (qname.getFormattedRevision().equals(identifier.getRevision())) {
501                     return qname;
502                 }
503             }
504             throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier + " Available: " + deviceSources.getRequiredSourcesQName());
505         }
506     }
507 }