Merge "BUG-628 Allow configuration to override module based capabilities from remote...
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.io.InputStream;
18 import java.util.LinkedList;
19 import java.util.List;
20 import java.util.concurrent.ExecutorService;
21 import org.opendaylight.controller.netconf.api.NetconfMessage;
22 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
23 import org.opendaylight.controller.sal.connect.api.MessageTransformer;
24 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
25 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
26 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
27 import org.opendaylight.controller.sal.connect.api.SchemaContextProviderFactory;
28 import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory;
29 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
30 import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
31 import org.opendaylight.controller.sal.connect.netconf.schema.NetconfDeviceSchemaProviderFactory;
32 import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaSourceProvider;
33 import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
34 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
35 import org.opendaylight.controller.sal.core.api.RpcImplementation;
36 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
38 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
39 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
45  */
46 public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilities, NetconfMessage> {
47
48     private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
49
50     private final RemoteDeviceId id;
51
52     private final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade;
53     private final ListeningExecutorService processingExecutor;
54     private final MessageTransformer<NetconfMessage> messageTransformer;
55     private final SchemaContextProviderFactory schemaContextProviderFactory;
56     private final SchemaSourceProviderFactory<InputStream> sourceProviderFactory;
57     private final NotificationHandler notificationHandler;
58
59     public static NetconfDevice createNetconfDevice(final RemoteDeviceId id,
60             final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider,
61             final ExecutorService executor, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
62
63         return new NetconfDevice(id, salFacade, executor, new NetconfMessageTransformer(),
64                 new NetconfDeviceSchemaProviderFactory(id), new SchemaSourceProviderFactory<InputStream>() {
65                     @Override
66                     public SchemaSourceProvider<InputStream> createSourceProvider(final RpcImplementation deviceRpc) {
67                         return schemaSourceProvider.createInstanceFor(new NetconfRemoteSchemaSourceProvider(id,
68                                 deviceRpc));
69                     }
70                 });
71     }
72
73     @VisibleForTesting
74     protected NetconfDevice(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
75             final ExecutorService processingExecutor, final MessageTransformer<NetconfMessage> messageTransformer,
76             final SchemaContextProviderFactory schemaContextProviderFactory,
77             final SchemaSourceProviderFactory<InputStream> sourceProviderFactory) {
78         this.id = id;
79         this.messageTransformer = messageTransformer;
80         this.salFacade = salFacade;
81         this.sourceProviderFactory = sourceProviderFactory;
82         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor);
83         this.schemaContextProviderFactory = schemaContextProviderFactory;
84         this.notificationHandler = new NotificationHandler(salFacade, messageTransformer, id);
85     }
86
87     @Override
88     public void onRemoteSessionUp(final NetconfSessionCapabilities remoteSessionCapabilities,
89                                   final RemoteDeviceCommunicator<NetconfMessage> listener) {
90         // SchemaContext setup has to be performed in a dedicated thread since
91         // we are in a netty thread in this method
92         // Yang models are being downloaded in this method and it would cause a
93         // deadlock if we used the netty thread
94         // http://netty.io/wiki/thread-model.html
95         logger.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
96
97         final ListenableFuture<?> salInitializationFuture = processingExecutor.submit(new Runnable() {
98             @Override
99             public void run() {
100                 final NetconfDeviceRpc deviceRpc = setUpDeviceRpc(remoteSessionCapabilities, listener);
101                 final SchemaSourceProvider<InputStream> delegate = sourceProviderFactory.createSourceProvider(deviceRpc);
102                 final SchemaContextProvider schemaContextProvider = setUpSchemaContext(delegate, remoteSessionCapabilities);
103                 updateMessageTransformer(schemaContextProvider);
104                 salFacade.onDeviceConnected(schemaContextProvider, remoteSessionCapabilities, deviceRpc);
105                 notificationHandler.onRemoteSchemaUp();
106             }
107         });
108
109         Futures.addCallback(salInitializationFuture, new FutureCallback<Object>() {
110             @Override
111             public void onSuccess(final Object result) {
112                 logger.debug("{}: Initialization in sal successful", id);
113                 logger.info("{}: Netconf connector initialized successfully", id);
114             }
115
116             @Override
117             public void onFailure(final Throwable t) {
118                 // Unable to initialize device, set as disconnected
119                 logger.error("{}: Initialization failed", id, t);
120                 salFacade.onDeviceDisconnected();
121                 // TODO ssh connection is still open if sal initialization fails
122             }
123         });
124     }
125
126     /**
127      * Update initial message transformer to use retrieved schema
128      */
129     private void updateMessageTransformer(final SchemaContextProvider schemaContextProvider) {
130         messageTransformer.onGlobalContextUpdated(schemaContextProvider.getSchemaContext());
131     }
132
133     private SchemaContextProvider setUpSchemaContext(final SchemaSourceProvider<InputStream> sourceProvider, final NetconfSessionCapabilities capabilities) {
134         return schemaContextProviderFactory.createContextProvider(capabilities.getModuleBasedCaps(), sourceProvider);
135     }
136
137     private NetconfDeviceRpc setUpDeviceRpc(final NetconfSessionCapabilities capHolder, final RemoteDeviceCommunicator<NetconfMessage> listener) {
138         Preconditions.checkArgument(capHolder.isMonitoringSupported(),
139                 "%s: Netconf device does not support netconf monitoring, yang schemas cannot be acquired. Netconf device capabilities", capHolder);
140         return new NetconfDeviceRpc(listener, messageTransformer);
141     }
142
143     @Override
144     public void onRemoteSessionDown() {
145         salFacade.onDeviceDisconnected();
146     }
147
148     @Override
149     public void onNotification(final NetconfMessage notification) {
150         notificationHandler.handleNotification(notification);
151     }
152
153     /**
154      * Handles incoming notifications. Either caches them(until onRemoteSchemaUp is called) or passes to sal Facade.
155      */
156     private final static class NotificationHandler {
157
158         private final RemoteDeviceHandler<?> salFacade;
159         private final List<NetconfMessage> cache = new LinkedList<>();
160         private final MessageTransformer<NetconfMessage> messageTransformer;
161         private boolean passNotifications = false;
162         private final RemoteDeviceId id;
163
164         NotificationHandler(final RemoteDeviceHandler<?> salFacade, final MessageTransformer<NetconfMessage> messageTransformer, final RemoteDeviceId id) {
165             this.salFacade = salFacade;
166             this.messageTransformer = messageTransformer;
167             this.id = id;
168         }
169
170         synchronized void handleNotification(final NetconfMessage notification) {
171             if(passNotifications) {
172                 passNotification(messageTransformer.toNotification(notification));
173             } else {
174                 cacheNotification(notification);
175             }
176         }
177
178         /**
179          * Forward all cached notifications and pass all notifications from this point directly to sal facade.
180          */
181         synchronized void onRemoteSchemaUp() {
182             passNotifications = true;
183
184             for (final NetconfMessage cachedNotification : cache) {
185                 passNotification(messageTransformer.toNotification(cachedNotification));
186             }
187
188             cache.clear();
189         }
190
191         private void cacheNotification(final NetconfMessage notification) {
192             Preconditions.checkState(passNotifications == false);
193
194             logger.debug("{}: Caching notification {}, remote schema not yet fully built", id, notification);
195             if(logger.isTraceEnabled()) {
196                 logger.trace("{}: Caching notification {}", id, XmlUtil.toString(notification.getDocument()));
197             }
198
199             cache.add(notification);
200         }
201
202         private void passNotification(final CompositeNode parsedNotification) {
203             logger.debug("{}: Forwarding notification {}", id, parsedNotification);
204             Preconditions.checkNotNull(parsedNotification);
205             salFacade.onNotification(parsedNotification);
206         }
207
208     }
209 }