aa8afbc664ac362a3667b724e41cf8b8e69379fc
[netconf.git] / opendaylight / netconf / sal-netconf-connector / src / main / java / org / opendaylight / netconf / sal / connect / netconf / listener / NetconfDeviceCommunicator.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.connect.netconf.listener;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Strings;
12 import com.google.common.collect.Lists;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.util.concurrent.Future;
17 import io.netty.util.concurrent.FutureListener;
18 import io.netty.util.concurrent.GenericFutureListener;
19 import java.util.ArrayDeque;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.Queue;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReentrantLock;
26 import org.opendaylight.controller.config.util.xml.XmlElement;
27 import org.opendaylight.controller.config.util.xml.XmlUtil;
28 import org.opendaylight.netconf.api.NetconfDocumentedException;
29 import org.opendaylight.netconf.api.NetconfMessage;
30 import org.opendaylight.netconf.api.NetconfTerminationReason;
31 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSession;
34 import org.opendaylight.netconf.client.NetconfClientSessionListener;
35 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
36 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
37 import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
38 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
39 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
40 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
41 import org.opendaylight.yangtools.yang.common.QName;
42 import org.opendaylight.yangtools.yang.common.RpcError;
43 import org.opendaylight.yangtools.yang.common.RpcResult;
44 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator<NetconfMessage> {
49
50     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
51
52     protected final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
53     private final Optional<UserPreferences> overrideNetconfCapabilities;
54     protected final RemoteDeviceId id;
55     private final Lock sessionLock = new ReentrantLock();
56
57     // TODO implement concurrent message limit
58     private final Queue<Request> requests = new ArrayDeque<>();
59     private NetconfClientSession session;
60
61     private Future<?> initFuture;
62     private SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
63
64     // isSessionClosing indicates a close operation on the session is issued and
65     // tearDown will surely be called later to finish the close.
66     // Used to allow only one thread to enter tearDown and other threads should
67     // NOT enter it simultaneously and should end its close operation without
68     // calling tearDown to release the locks they hold to avoid deadlock.
69     private volatile AtomicBoolean isSessionClosing = new AtomicBoolean(false);
70
71     public Boolean isSessionClosing() {
72         return isSessionClosing.get();
73     }
74
75     public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
76             final UserPreferences NetconfSessionPreferences) {
77         this(id, remoteDevice, Optional.of(NetconfSessionPreferences));
78     }
79
80     public NetconfDeviceCommunicator(final RemoteDeviceId id,
81                                      final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice) {
82         this(id, remoteDevice, Optional.<UserPreferences>absent());
83     }
84
85     private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
86             final Optional<UserPreferences> overrideNetconfCapabilities) {
87         this.id = id;
88         this.remoteDevice = remoteDevice;
89         this.overrideNetconfCapabilities = overrideNetconfCapabilities;
90         this.firstConnectionFuture = SettableFuture.create();
91     }
92
93     @Override
94     public void onSessionUp(final NetconfClientSession session) {
95         sessionLock.lock();
96         try {
97             LOG.debug("{}: Session established", id);
98             this.session = session;
99
100             NetconfSessionPreferences netconfSessionPreferences =
101                                              NetconfSessionPreferences.fromNetconfSession(session);
102             LOG.trace("{}: Session advertised capabilities: {}", id,
103                     netconfSessionPreferences);
104
105             if(overrideNetconfCapabilities.isPresent()) {
106                 netconfSessionPreferences = overrideNetconfCapabilities.get().isOverride() ?
107                         netconfSessionPreferences.replaceModuleCaps(overrideNetconfCapabilities.get().getSessionPreferences()) :
108                         netconfSessionPreferences.addModuleCaps(overrideNetconfCapabilities.get().getSessionPreferences());
109                 LOG.debug(
110                         "{}: Session capabilities overridden, capabilities that will be used: {}",
111                         id, netconfSessionPreferences);
112             }
113
114
115             remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
116             if (!firstConnectionFuture.isDone()) {
117                 firstConnectionFuture.set(netconfSessionPreferences.getNetconfDeviceCapabilities());
118             }
119         }
120         finally {
121             sessionLock.unlock();
122         }
123     }
124
125     /**
126      *
127      * @param dispatcher
128      * @param config
129      * @return future that returns succes on first succesfull connection and failure when the underlying
130      * reconnecting strategy runs out of reconnection attempts
131      */
132     public ListenableFuture<NetconfDeviceCapabilities> initializeRemoteConnection(final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
133         // TODO 2313 extract listener from configuration
134         if(config instanceof NetconfReconnectingClientConfiguration) {
135             initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
136         } else {
137             initFuture = dispatcher.createClient(config);
138         }
139
140
141         initFuture.addListener(new GenericFutureListener<Future<Object>>(){
142
143             @Override
144             public void operationComplete(Future<Object> future) throws Exception {
145                 if (!future.isSuccess() && !future.isCancelled()) {
146                     LOG.debug("{}: Connection failed", id, future.cause());
147                     NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause());
148                     if (firstConnectionFuture.isDone()) {
149                         firstConnectionFuture.setException(future.cause());
150                     }
151                 }
152             }
153         });
154         return firstConnectionFuture;
155     }
156
157     public void disconnect() {
158         // If session is already in closing, no need to close it again
159         if(session != null && isSessionClosing.compareAndSet(false, true)) {
160             session.close();
161         }
162     }
163
164     private void tearDown( String reason ) {
165         if (!isSessionClosing()) {
166             LOG.warn("It's curious that no one to close the session but tearDown is called!");
167         }
168         LOG.debug("Tearing down {}", reason);
169         List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = Lists.newArrayList();
170         sessionLock.lock();
171         try {
172             if( session != null ) {
173                 session = null;
174
175                 /*
176                  * Walk all requests, check if they have been executing
177                  * or cancelled and remove them from the queue.
178                  */
179                 final Iterator<Request> it = requests.iterator();
180                 while (it.hasNext()) {
181                     final Request r = it.next();
182                     if (r.future.isUncancellable()) {
183                         futuresToCancel.add( r.future );
184                         it.remove();
185                     } else if (r.future.isCancelled()) {
186                         // This just does some house-cleaning
187                         it.remove();
188                     }
189                 }
190
191                 remoteDevice.onRemoteSessionDown();
192             }
193         }
194         finally {
195             sessionLock.unlock();
196         }
197
198         // Notify pending request futures outside of the sessionLock to avoid unnecessarily
199         // blocking the caller.
200         for( UncancellableFuture<RpcResult<NetconfMessage>> future: futuresToCancel ) {
201             if( Strings.isNullOrEmpty( reason ) ) {
202                 future.set( createSessionDownRpcResult() );
203             } else {
204                 future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, reason ) );
205             }
206         }
207
208         isSessionClosing.set(false);
209     }
210
211     private RpcResult<NetconfMessage> createSessionDownRpcResult() {
212         return createErrorRpcResult( RpcError.ErrorType.TRANSPORT,
213                              String.format( "The netconf session to %1$s is disconnected", id.getName() ) );
214     }
215
216     private RpcResult<NetconfMessage> createErrorRpcResult( RpcError.ErrorType errorType, String message ) {
217         return RpcResultBuilder.<NetconfMessage>failed()
218                 .withError(errorType, NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(), message).build();
219     }
220
221     @Override
222     public void onSessionDown(final NetconfClientSession session, final Exception e) {
223         // If session is already in closing, no need to call tearDown again.
224         if (isSessionClosing.compareAndSet(false, true)) {
225             LOG.warn("{}: Session went down", id, e);
226             tearDown( null );
227         }
228     }
229
230     @Override
231     public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
232         // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing.
233         LOG.warn("{}: Session terminated {}", id, reason);
234         tearDown( reason.getErrorMessage() );
235     }
236
237     @Override
238     public void close() {
239         // Cancel reconnect if in progress
240         if(initFuture != null) {
241             initFuture.cancel(false);
242         }
243         // Disconnect from device
244         if(session != null) {
245             session.close();
246             // tear down not necessary, called indirectly by above close
247         }
248     }
249
250     @Override
251     public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
252         /*
253          * Dispatch between notifications and messages. Messages need to be processed
254          * with lock held, notifications do not.
255          */
256         if (isNotification(message)) {
257             processNotification(message);
258         } else {
259             processMessage(message);
260         }
261     }
262
263     private void processMessage(final NetconfMessage message) {
264         Request request = null;
265         sessionLock.lock();
266
267         try {
268             request = requests.peek();
269             if (request != null && request.future.isUncancellable()) {
270                 requests.poll();
271             } else {
272                 request = null;
273                 LOG.warn("{}: Ignoring unsolicited message {}", id,
274                         msgToS(message));
275             }
276         }
277         finally {
278             sessionLock.unlock();
279         }
280
281         if( request != null ) {
282
283             LOG.debug("{}: Message received {}", id, message);
284
285             if(LOG.isTraceEnabled()) {
286                 LOG.trace( "{}: Matched request: {} to response: {}", id, msgToS( request.request ), msgToS( message ) );
287             }
288
289             try {
290                 NetconfMessageTransformUtil.checkValidReply( request.request, message );
291             } catch (final NetconfDocumentedException e) {
292                 LOG.warn(
293                         "{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}",
294                         id, msgToS(request.request), msgToS(message), e);
295
296                 request.future.set( RpcResultBuilder.<NetconfMessage>failed()
297                         .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() );
298
299                 //recursively processing message to eventually find matching request
300                 processMessage(message);
301
302                 return;
303             }
304
305             try {
306                 NetconfMessageTransformUtil.checkSuccessReply(message);
307             } catch(final NetconfDocumentedException e) {
308                 LOG.warn(
309                         "{}: Error reply from remote device, request: {}, response: {}",
310                         id, msgToS(request.request), msgToS(message), e);
311
312                 request.future.set( RpcResultBuilder.<NetconfMessage>failed()
313                         .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() );
314                 return;
315             }
316
317             request.future.set( RpcResultBuilder.success( message ).build() );
318         }
319     }
320
321     private static String msgToS(final NetconfMessage msg) {
322         return XmlUtil.toString(msg.getDocument());
323     }
324
325     @Override
326     public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
327         sessionLock.lock();
328         try {
329             return sendRequestWithLock( message, rpc );
330         } finally {
331             sessionLock.unlock();
332         }
333     }
334
335     private ListenableFuture<RpcResult<NetconfMessage>> sendRequestWithLock(
336                                                final NetconfMessage message, final QName rpc) {
337         if(LOG.isTraceEnabled()) {
338             LOG.trace("{}: Sending message {}", id, msgToS(message));
339         }
340
341         if (session == null) {
342             LOG.warn("{}: Session is disconnected, failing RPC request {}",
343                     id, message);
344             return Futures.immediateFuture( createSessionDownRpcResult() );
345         }
346
347         final Request req = new Request( new UncancellableFuture<RpcResult<NetconfMessage>>(true),
348                                          message );
349         requests.add(req);
350
351         session.sendMessage(req.request).addListener(new FutureListener<Void>() {
352             @Override
353             public void operationComplete(final Future<Void> future) throws Exception {
354                 if( !future.isSuccess() ) {
355                     // We expect that a session down will occur at this point
356                     LOG.debug("{}: Failed to send request {}", id,
357                             XmlUtil.toString(req.request.getDocument()),
358                             future.cause());
359
360                     if( future.cause() != null ) {
361                         req.future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT,
362                                                               future.cause().getLocalizedMessage() ) );
363                     } else {
364                         req.future.set( createSessionDownRpcResult() ); // assume session is down
365                     }
366                     req.future.setException( future.cause() );
367                 }
368                 else {
369                     LOG.trace("Finished sending request {}", req.request);
370                 }
371             }
372         });
373
374         return req.future;
375     }
376
377     private void processNotification(final NetconfMessage notification) {
378         if(LOG.isTraceEnabled()) {
379             LOG.trace("{}: Notification received: {}", id, notification);
380         }
381
382         remoteDevice.onNotification(notification);
383     }
384
385     private static boolean isNotification(final NetconfMessage message) {
386         final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
387         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
388     }
389
390     private static final class Request {
391         final UncancellableFuture<RpcResult<NetconfMessage>> future;
392         final NetconfMessage request;
393
394         private Request(final UncancellableFuture<RpcResult<NetconfMessage>> future,
395                         final NetconfMessage request) {
396             this.future = future;
397             this.request = request;
398         }
399     }
400 }