BUG-1511 - Datastore: cleanup ListenerTreeAPI
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Collections2;
14 import com.google.common.collect.Lists;
15 import com.google.common.collect.Sets;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.ListeningExecutorService;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutorService;
29 import org.opendaylight.controller.netconf.api.NetconfMessage;
30 import org.opendaylight.controller.sal.connect.api.MessageTransformer;
31 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
32 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
33 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
34 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
35 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
36 import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
37 import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider;
38 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason;
40 import org.opendaylight.yangtools.yang.common.QName;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
43 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
44 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
45 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
46 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
47 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
48 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
49 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
50 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
56  */
57 public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage> {
58
59     private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
60
61     public static final Function<QName, SourceIdentifier> QNAME_TO_SOURCE_ID_FUNCTION = new Function<QName, SourceIdentifier>() {
62         @Override
63         public SourceIdentifier apply(final QName input) {
64             return new SourceIdentifier(input.getLocalName(), Optional.fromNullable(input.getFormattedRevision()));
65         }
66     };
67
68     private final RemoteDeviceId id;
69
70     private final SchemaContextFactory schemaContextFactory;
71     private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
72     private final ListeningExecutorService processingExecutor;
73     private final SchemaSourceRegistry schemaRegistry;
74     private final MessageTransformer<NetconfMessage> messageTransformer;
75     private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
76     private final NotificationHandler notificationHandler;
77     private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
78
79     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
80                          final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer) {
81         this.id = id;
82         this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
83         this.messageTransformer = messageTransformer;
84         this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
85         this.salFacade = salFacade;
86         this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver();
87         this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor);
88         this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
89     }
90
91     @Override
92     public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
93                                   final RemoteDeviceCommunicator<NetconfMessage> listener) {
94         // SchemaContext setup has to be performed in a dedicated thread since
95         // we are in a netty thread in this method
96         // Yang models are being downloaded in this method and it would cause a
97         // deadlock if we used the netty thread
98         // http://netty.io/wiki/thread-model.html
99         logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
100
101         final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(listener);
102
103         final DeviceSourcesResolver task = new DeviceSourcesResolver(deviceRpc, remoteSessionCapabilities, id, stateSchemasResolver);
104         final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(task);
105
106         final FutureCallback<DeviceSources> resolvedSourceCallback = new FutureCallback<DeviceSources>() {
107             @Override
108             public void onSuccess(final DeviceSources result) {
109                 addProvidedSourcesToSchemaRegistry(deviceRpc, result);
110                 setUpSchema(result);
111             }
112
113             private void setUpSchema(final DeviceSources result) {
114                 processingExecutor.submit(new RecursiveSchemaSetup(result, remoteSessionCapabilities, deviceRpc, listener));
115             }
116
117             @Override
118             public void onFailure(final Throwable t) {
119                 logger.warn("{}: Unexpected error resolving device sources: {}", id, t);
120                 handleSalInitializationFailure(t, listener);
121             }
122         };
123
124         Futures.addCallback(sourceResolverFuture, resolvedSourceCallback);
125
126     }
127
128     private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) {
129         updateMessageTransformer(result);
130         salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
131         notificationHandler.onRemoteSchemaUp();
132
133         logger.debug("{}: Initialization in sal successful", id);
134         logger.info("{}: Netconf connector initialized successfully", id);
135     }
136
137     private void handleSalInitializationFailure(final Throwable t, final RemoteDeviceCommunicator<NetconfMessage> listener) {
138         logger.error("{}: Initialization in sal failed, disconnecting from device", id, t);
139         listener.close();
140         onRemoteSessionDown();
141         resetMessageTransformer();
142     }
143
144     /**
145      * Set the schema context inside transformer to null as is in initial state
146      */
147     private void resetMessageTransformer() {
148         updateMessageTransformer(null);
149     }
150
151     /**
152      * Update initial message transformer to use retrieved schema
153      * @param currentSchemaContext
154      */
155     private void updateMessageTransformer(final SchemaContext currentSchemaContext) {
156         messageTransformer.onGlobalContextUpdated(currentSchemaContext);
157     }
158
159     private void addProvidedSourcesToSchemaRegistry(final NetconfDeviceRpc deviceRpc, final DeviceSources deviceSources) {
160         final NetconfRemoteSchemaYangSourceProvider yangProvider = new NetconfRemoteSchemaYangSourceProvider(id, deviceRpc);
161         for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) {
162             sourceRegistrations.add(schemaRegistry.registerSchemaSource(yangProvider,
163                     PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
164         }
165     }
166
167     private NetconfDeviceRpc setUpDeviceRpc(final RemoteDeviceCommunicator<NetconfMessage> listener) {
168        return new NetconfDeviceRpc(listener, messageTransformer);
169     }
170
171     @Override
172     public void onRemoteSessionDown() {
173         salFacade.onDeviceDisconnected();
174         for (final SchemaSourceRegistration<? extends SchemaSourceRepresentation> sourceRegistration : sourceRegistrations) {
175             sourceRegistration.close();
176         }
177         resetMessageTransformer();
178     }
179
180     @Override
181     public void onRemoteSessionFailed(Throwable throwable) {
182         salFacade.onDeviceFailed(throwable);
183     }
184
185     @Override
186     public void onNotification(final NetconfMessage notification) {
187         notificationHandler.handleNotification(notification);
188     }
189
190     /**
191      * Just a transfer object containing schema related dependencies. Injected in constructor.
192      */
193     public static class SchemaResourcesDTO {
194         private final SchemaSourceRegistry schemaRegistry;
195         private final SchemaContextFactory schemaContextFactory;
196         private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
197
198         public SchemaResourcesDTO(final SchemaSourceRegistry schemaRegistry, final SchemaContextFactory schemaContextFactory, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
199             this.schemaRegistry = Preconditions.checkNotNull(schemaRegistry);
200             this.schemaContextFactory = Preconditions.checkNotNull(schemaContextFactory);
201             this.stateSchemasResolver = Preconditions.checkNotNull(stateSchemasResolver);
202         }
203
204         public SchemaSourceRegistry getSchemaRegistry() {
205             return schemaRegistry;
206         }
207
208         public SchemaContextFactory getSchemaContextFactory() {
209             return schemaContextFactory;
210         }
211
212         public NetconfStateSchemas.NetconfStateSchemasResolver getStateSchemasResolver() {
213             return stateSchemasResolver;
214         }
215     }
216
217     /**
218      * Schema building callable.
219      */
220     private static class DeviceSourcesResolver implements Callable<DeviceSources> {
221         private final NetconfDeviceRpc deviceRpc;
222         private final NetconfSessionPreferences remoteSessionCapabilities;
223         private final RemoteDeviceId id;
224         private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
225
226         public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
227             this.deviceRpc = deviceRpc;
228             this.remoteSessionCapabilities = remoteSessionCapabilities;
229             this.id = id;
230             this.stateSchemasResolver = stateSchemasResolver;
231         }
232
233         @Override
234         public DeviceSources call() throws Exception {
235
236             final Set<SourceIdentifier> requiredSources = Sets.newHashSet(Collections2.transform(
237                     remoteSessionCapabilities.getModuleBasedCaps(), QNAME_TO_SOURCE_ID_FUNCTION));
238
239             // If monitoring is not supported, we will still attempt to create schema, sources might be already provided
240             final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
241             logger.debug("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
242
243             final Set<SourceIdentifier> providedSources = Sets.newHashSet(Collections2.transform(
244                     availableSchemas.getAvailableYangSchemasQNames(), QNAME_TO_SOURCE_ID_FUNCTION));
245
246             final Set<SourceIdentifier> requiredSourcesNotProvided = Sets.difference(requiredSources, providedSources);
247
248             if (!requiredSourcesNotProvided.isEmpty()) {
249                 logger.warn("{}: Netconf device does not provide all yang models reported in hello message capabilities, required but not provided: {}",
250                         id, requiredSourcesNotProvided);
251                 logger.warn("{}: Attempting to build schema context from required sources", id);
252             }
253
254
255             // TODO should we perform this ? We have a mechanism to fix initialization of devices not reporting or required modules in hello
256             // That is overriding capabilities in configuration using attribute yang-module-capabilities
257             // This is more user friendly even though it clashes with attribute yang-module-capabilities
258             // Some devices do not report all required models in hello message, but provide them
259             final Set<SourceIdentifier> providedSourcesNotRequired = Sets.difference(providedSources, requiredSources);
260             if (!providedSourcesNotRequired.isEmpty()) {
261                 logger.warn("{}: Netconf device provides additional yang models not reported in hello message capabilities: {}",
262                         id, providedSourcesNotRequired);
263                 logger.warn("{}: Adding provided but not required sources as required to prevent failures", id);
264                 requiredSources.addAll(providedSourcesNotRequired);
265             }
266
267             return new DeviceSources(requiredSources, providedSources);
268         }
269     }
270
271     /**
272      * Contains RequiredSources - sources from capabilities.
273      *
274      */
275     private static final class DeviceSources {
276         private final Collection<SourceIdentifier> requiredSources;
277         private final Collection<SourceIdentifier> providedSources;
278
279         public DeviceSources(final Collection<SourceIdentifier> requiredSources, final Collection<SourceIdentifier> providedSources) {
280             this.requiredSources = requiredSources;
281             this.providedSources = providedSources;
282         }
283
284         public Collection<SourceIdentifier> getRequiredSources() {
285             return requiredSources;
286         }
287
288         public Collection<SourceIdentifier> getProvidedSources() {
289             return providedSources;
290         }
291
292     }
293
294     /**
295      * Schema builder that tries to build schema context from provided sources or biggest subset of it.
296      */
297     private final class RecursiveSchemaSetup implements Runnable {
298         private final DeviceSources deviceSources;
299         private final NetconfSessionPreferences remoteSessionCapabilities;
300         private final NetconfDeviceRpc deviceRpc;
301         private final RemoteDeviceCommunicator<NetconfMessage> listener;
302         private NetconfDeviceCapabilities capabilities;
303
304         public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator<NetconfMessage> listener) {
305             this.deviceSources = deviceSources;
306             this.remoteSessionCapabilities = remoteSessionCapabilities;
307             this.deviceRpc = deviceRpc;
308             this.listener = listener;
309             this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
310         }
311
312         @Override
313         public void run() {
314             setUpSchema(deviceSources.getRequiredSources());
315         }
316
317         /**
318          * Recursively build schema context, in case of success or final failure notify device
319          */
320         // FIXME reimplement without recursion
321         private void setUpSchema(final Collection<SourceIdentifier> requiredSources) {
322             logger.trace("{}: Trying to build schema context from {}", id, requiredSources);
323
324             // If no more sources, fail
325             if(requiredSources.isEmpty()) {
326                 handleSalInitializationFailure(new IllegalStateException(id + ": No more sources for schema context"), listener);
327                 return;
328             }
329
330             final CheckedFuture<SchemaContext, SchemaResolutionException> schemaBuilderFuture = schemaContextFactory.createSchemaContext(requiredSources);
331
332             final FutureCallback<SchemaContext> RecursiveSchemaBuilderCallback = new FutureCallback<SchemaContext>() {
333
334                 @Override
335                 public void onSuccess(final SchemaContext result) {
336                     logger.debug("{}: Schema context built successfully from {}", id, requiredSources);
337                     Collection<QName> filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet());
338                     capabilities.addCapabilities(filteredQNames);
339                     capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps());
340                     handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
341                 }
342
343                 @Override
344                 public void onFailure(final Throwable t) {
345                     // In case source missing, try without it
346                     if (t instanceof MissingSchemaSourceException) {
347                         final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId();
348                         logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource);
349                         capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), FailureReason.MissingSource);
350                         setUpSchema(stripMissingSource(requiredSources, missingSource));
351
352                     // In case resolution error, try only with resolved sources
353                     } else if (t instanceof SchemaResolutionException) {
354                         // TODO check for infinite loop
355                         final SchemaResolutionException resolutionException = (SchemaResolutionException) t;
356                         final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
357                         capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), FailureReason.UnableToResolve);
358                         logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports());
359                         setUpSchema(resolutionException.getResolvedSources());
360                     // unknown error, fail
361                     } else {
362                         handleSalInitializationFailure(t, listener);
363                     }
364                 }
365             };
366
367             Futures.addCallback(schemaBuilderFuture, RecursiveSchemaBuilderCallback);
368         }
369
370         private Collection<SourceIdentifier> stripMissingSource(final Collection<SourceIdentifier> requiredSources, final SourceIdentifier sIdToRemove) {
371             final LinkedList<SourceIdentifier> sourceIdentifiers = Lists.newLinkedList(requiredSources);
372             final boolean removed = sourceIdentifiers.remove(sIdToRemove);
373             Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources);
374             return sourceIdentifiers;
375         }
376
377         private Collection<QName> getQNameFromSourceIdentifiers(Collection<SourceIdentifier> identifiers) {
378             Collection<QName> qNames = new HashSet<>();
379             for (SourceIdentifier source : identifiers) {
380                 Optional<QName> qname = getQNameFromSourceIdentifier(source);
381                 if (qname.isPresent()) {
382                     qNames.add(qname.get());
383                 }
384             }
385             if (qNames.isEmpty()) {
386                 logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers);
387             }
388             return qNames;
389         }
390
391         private Optional<QName> getQNameFromSourceIdentifier(SourceIdentifier identifier) {
392             for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) {
393                 if (qname.getLocalName().equals(identifier.getName())
394                         && qname.getFormattedRevision().equals(identifier.getRevision())) {
395                     return Optional.of(qname);
396                 }
397             }
398             throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier);
399         }
400     }
401 }