Merge "Add exists method on DOMStoreReadTransaction and DOMDataReadTransaction"
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import java.io.InputStream;
11 import java.util.LinkedList;
12 import java.util.List;
13 import java.util.concurrent.ExecutorService;
14
15 import org.opendaylight.controller.netconf.api.NetconfMessage;
16 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
17 import org.opendaylight.controller.sal.connect.api.MessageTransformer;
18 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
19 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
20 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
21 import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory;
22 import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory;
23 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
24 import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
25 import org.opendaylight.controller.sal.connect.netconf.schema.NetconfDeviceSchemaProviderFactory;
26 import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaSourceProvider;
27 import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
28 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
29 import org.opendaylight.controller.sal.core.api.RpcImplementation;
30 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
32 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
33 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.annotations.VisibleForTesting;
38 import com.google.common.base.Preconditions;
39 import com.google.common.util.concurrent.FutureCallback;
40 import com.google.common.util.concurrent.Futures;
41 import com.google.common.util.concurrent.ListenableFuture;
42 import com.google.common.util.concurrent.ListeningExecutorService;
43 import com.google.common.util.concurrent.MoreExecutors;
44
45 /**
46  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
47  */
48 public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilities, NetconfMessage> {
49
50     private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
51
52     private final RemoteDeviceId id;
53
54     private final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade;
55     private final ListeningExecutorService processingExecutor;
56     private final MessageTransformer<NetconfMessage> messageTransformer;
57     private final SchemaContextProviderFactory schemaContextProviderFactory;
58     private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
59     private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
60     private final NotificationHandler notificationHandler;
61
62     public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
63             final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
64             final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
65         return createNetconfDevice(id, schemaSourceProvider, executor, salFacade, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
66     }
67
68     @VisibleForTesting
69     protected static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
70             final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
71             final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
72             final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
73
74         return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(),
75                 new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory<InputStream>() {
76                     @Override
77                     public SchemaSourceProvider<InputStream> createSourceProvider(final RpcImplementation deviceRpc) {
78                         return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id,
79                                 deviceRpc));
80                     }
81                 }, stateSchemasResolver);
82     }
83
84     @VisibleForTesting
85     protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
86                             final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
87                             final SchemaContextProviderFactory schemaContextProviderFactory,
88                             final SchemaSourceProviderFactory<InputStream> sourceProviderFactory,
89                             final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
90         this.id = id;
91         this.messageTransformer = messageTransformer;
92         this.salFacade = salFacade;
93         this.sourceProviderFactory = sourceProviderFactory;
94         this.stateSchemasResolver = stateSchemasResolver;
95         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
96         this.schemaContextProviderFactory = schemaContextProviderFactory;
97         this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
98     }
99
100     @Override
101     public void onRemoteSessionUp(final NetconfSessionCapabilities remoteSessionCapabilities,
102                                   final RemoteDeviceCommunicator<NetconfMessage> listener) {
103         // SchemaContext setup has to be performed in a dedicated thread since
104         // we are in a netty thread in this method
105         // Yang models are being downloaded in this method and it would cause a
106         // deadlock if we used the netty thread
107         // http://netty.io/wiki/thread-model.html
108         logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
109
110         final ListenableFuture<?> salInitializationFuture = processingExecutor.submit(new Runnable() {
111             @Override
112             public void run() {
113                 final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener);
114
115                 final NetconfStateSchemas availableSchemas = stateSchemasResolver.resolve(deviceRpc, remoteSessionCapabilities, id);
116                 logger.warn("{}: Schemas exposed by ietf-netconf-monitoring: {}", id, availableSchemas.getAvailableYangSchemasQNames());
117                 // TODO use this for shared schema context
118
119                 final SchemaSourceProvider<InputStream> delegate = sourceProviderFactory.createSourceProvider(deviceRpc);
120                 final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
121                 updateMessageTransformer(schemaContextProvider);
122                 salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
123                 notificationHandler.onRemoteSchemaUp();
124             }
125         });
126
127         Futures.addCallback(salInitializationFuture, new FutureCallback<Object>() {
128             @Override
129             public void onSuccess(final Object result) {
130                 logger.debug("{}: Initialization in sal successful", id);
131                 logger.info("{}: Netconf connector initialized successfully", id);
132             }
133
134             @Override
135             public void onFailure(final Throwable t) {
136                 // Unable to initialize device, set as disconnected
137                 logger.error("{}: Initialization failed", id, t);
138                 salFacade.onDeviceDisconnected();
139                 // TODO ssh connection is still open if sal initialization fails
140             }
141         });
142     }
143
144     /**
145      * Update initial message transformer to use retrieved schema
146      */
147     private void updateMessageTransformer(final SchemaContextProvider schemaContextProvider) {
148         messageTransformer.onGlobalContextUpdated(schemaContextProvider.getSchemaContext());
149     }
150
151     private SchemaContextProvider setUpSchemaContext(final SchemaSourceProvider<InputStream> sourceProvider, final NetconfSessionCapabilities capabilities) {
152         return schemaContextProviderFactory.createContextProvider(capabilities.getModuleBasedCaps(), sourceProvider);
153     }
154
155     private NetconfDeviceRpc setUpDeviceRpc(final NetconfSessionCapabilities capHolder, final RemoteDeviceCommunicator<NetconfMessage> listener) {
156         Preconditions.checkArgument(capHolder.isMonitoringSupported(),
157                 "%s: Netconf device does not support netconf monitoring, yang schemas cannot be acquired. Netconf device capabilities", capHolder);
158         return new NetconfDeviceRpc(listener, messageTransformer);
159     }
160
161     @Override
162     public void onRemoteSessionDown() {
163         salFacade.onDeviceDisconnected();
164     }
165
166     @Override
167     public void onNotification(final NetconfMessage notification) {
168         notificationHandler.handleNotification(notification);
169     }
170
171     /**
172      * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
173      */
174     private final static class NotificationHandler {
175
176         private final RemoteDeviceHandler<?> salFacade;
177         private final List<NetconfMessage> cache = new LinkedList<>();
178         private final MessageTransformer<NetconfMessage> messageTransformer;
179         private boolean passNotifications = false;
180         private final RemoteDeviceId id;
181
182         NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
183             this.salFacade = salFacade;
184             this.messageTransformer = messageTransformer;
185             this.id = id;
186         }
187
188         synchronized void handleNotification(final NetconfMessage notification) {
189             if(passNotifications) {
190                 passNotification(messageTransformer.toNotification(notification));
191             } else {
192                 cacheNotification(notification);
193             }
194         }
195
196         /**
197          * Forward all cached notifications and pass all notifications from this point directly to sal facade.
198          */
199         synchronized void onRemoteSchemaUp() {
200             passNotifications = true;
201
202             for (final NetconfMessage cachedNotification : cache) {
203                 passNotification(messageTransformer.toNotification(cachedNotification));
204             }
205
206             cache.clear();
207         }
208
209         private void cacheNotification(final NetconfMessage notification) {
210             Preconditions.checkState(passNotifications == false);
211
212             logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
213             if(logger.isTraceEnabled()) {
214                 logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
215             }
216
217             cache.add(notification);
218         }
219
220         private void passNotification(final CompositeNode parsedNotification) {
221             logger.debug("{}: Forwarding notification {}", id, parsedNotification);
222             Preconditions.checkNotNull(parsedNotification);
223             salFacade.onNotification(parsedNotification);
224         }
225     }
226
227 }