Do a proper disconnect when deleting a connector.
[netconf.git] / opendaylight / netconf / sal-netconf-connector / src / main / java / org / opendaylight / netconf / sal / connect / netconf / listener / NetconfDeviceCommunicator.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.connect.netconf.listener;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Strings;
12 import com.google.common.collect.Lists;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.util.concurrent.Future;
17 import io.netty.util.concurrent.FutureListener;
18 import io.netty.util.concurrent.GenericFutureListener;
19 import java.util.ArrayDeque;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.Queue;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReentrantLock;
26 import org.opendaylight.controller.config.util.xml.XmlElement;
27 import org.opendaylight.controller.config.util.xml.XmlUtil;
28 import org.opendaylight.netconf.api.NetconfDocumentedException;
29 import org.opendaylight.netconf.api.NetconfMessage;
30 import org.opendaylight.netconf.api.NetconfTerminationReason;
31 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSession;
34 import org.opendaylight.netconf.client.NetconfClientSessionListener;
35 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
36 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
37 import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
38 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
39 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
40 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
41 import org.opendaylight.yangtools.yang.common.QName;
42 import org.opendaylight.yangtools.yang.common.RpcError;
43 import org.opendaylight.yangtools.yang.common.RpcResult;
44 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator<NetconfMessage> {
49
50     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
51
52     protected final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
53     private final Optional<UserPreferences> overrideNetconfCapabilities;
54     protected final RemoteDeviceId id;
55     private final Lock sessionLock = new ReentrantLock();
56
57     // TODO implement concurrent message limit
58     private final Queue<Request> requests = new ArrayDeque<>();
59     private NetconfClientSession session;
60
61     private Future<?> initFuture;
62     private SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
63
64     // isSessionClosing indicates a close operation on the session is issued and
65     // tearDown will surely be called later to finish the close.
66     // Used to allow only one thread to enter tearDown and other threads should
67     // NOT enter it simultaneously and should end its close operation without
68     // calling tearDown to release the locks they hold to avoid deadlock.
69     private volatile AtomicBoolean isSessionClosing = new AtomicBoolean(false);
70
71     public Boolean isSessionClosing() {
72         return isSessionClosing.get();
73     }
74
75     public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
76             final UserPreferences NetconfSessionPreferences) {
77         this(id, remoteDevice, Optional.of(NetconfSessionPreferences));
78     }
79
80     public NetconfDeviceCommunicator(final RemoteDeviceId id,
81                                      final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice) {
82         this(id, remoteDevice, Optional.<UserPreferences>absent());
83     }
84
85     private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
86             final Optional<UserPreferences> overrideNetconfCapabilities) {
87         this.id = id;
88         this.remoteDevice = remoteDevice;
89         this.overrideNetconfCapabilities = overrideNetconfCapabilities;
90         this.firstConnectionFuture = SettableFuture.create();
91     }
92
93     @Override
94     public void onSessionUp(final NetconfClientSession session) {
95         sessionLock.lock();
96         try {
97             LOG.debug("{}: Session established", id);
98             this.session = session;
99
100             NetconfSessionPreferences netconfSessionPreferences =
101                                              NetconfSessionPreferences.fromNetconfSession(session);
102             LOG.trace("{}: Session advertised capabilities: {}", id,
103                     netconfSessionPreferences);
104
105             if(overrideNetconfCapabilities.isPresent()) {
106                 netconfSessionPreferences = overrideNetconfCapabilities.get().isOverride() ?
107                         netconfSessionPreferences.replaceModuleCaps(overrideNetconfCapabilities.get().getSessionPreferences()) :
108                         netconfSessionPreferences.addModuleCaps(overrideNetconfCapabilities.get().getSessionPreferences());
109                 LOG.debug(
110                         "{}: Session capabilities overridden, capabilities that will be used: {}",
111                         id, netconfSessionPreferences);
112             }
113
114
115             remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
116             if (!firstConnectionFuture.isDone()) {
117                 firstConnectionFuture.set(netconfSessionPreferences.getNetconfDeviceCapabilities());
118             }
119         }
120         finally {
121             sessionLock.unlock();
122         }
123     }
124
125     /**
126      *
127      * @param dispatcher
128      * @param config
129      * @return future that returns succes on first succesfull connection and failure when the underlying
130      * reconnecting strategy runs out of reconnection attempts
131      */
132     public ListenableFuture<NetconfDeviceCapabilities> initializeRemoteConnection(final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
133         // TODO 2313 extract listener from configuration
134         if(config instanceof NetconfReconnectingClientConfiguration) {
135             initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
136         } else {
137             initFuture = dispatcher.createClient(config);
138         }
139
140
141         initFuture.addListener(new GenericFutureListener<Future<Object>>(){
142
143             @Override
144             public void operationComplete(Future<Object> future) throws Exception {
145                 if (!future.isSuccess() && !future.isCancelled()) {
146                     LOG.debug("{}: Connection failed", id, future.cause());
147                     NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause());
148                     if (firstConnectionFuture.isDone()) {
149                         firstConnectionFuture.setException(future.cause());
150                     }
151                 }
152             }
153         });
154         return firstConnectionFuture;
155     }
156
157     public void disconnect() {
158         // If session is already in closing, no need to close it again
159         if(session != null && isSessionClosing.compareAndSet(false, true)) {
160             session.close();
161         }
162     }
163
164     private void tearDown( String reason ) {
165         if (!isSessionClosing()) {
166             LOG.warn("It's curious that no one to close the session but tearDown is called!");
167         }
168         LOG.debug("Tearing down {}", reason);
169         List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = Lists.newArrayList();
170         sessionLock.lock();
171         try {
172             if( session != null ) {
173                 session = null;
174
175                 /*
176                  * Walk all requests, check if they have been executing
177                  * or cancelled and remove them from the queue.
178                  */
179                 final Iterator<Request> it = requests.iterator();
180                 while (it.hasNext()) {
181                     final Request r = it.next();
182                     if (r.future.isUncancellable()) {
183                         futuresToCancel.add( r.future );
184                         it.remove();
185                     } else if (r.future.isCancelled()) {
186                         // This just does some house-cleaning
187                         it.remove();
188                     }
189                 }
190
191                 remoteDevice.onRemoteSessionDown();
192             }
193         }
194         finally {
195             sessionLock.unlock();
196         }
197
198         // Notify pending request futures outside of the sessionLock to avoid unnecessarily
199         // blocking the caller.
200         for( UncancellableFuture<RpcResult<NetconfMessage>> future: futuresToCancel ) {
201             if( Strings.isNullOrEmpty( reason ) ) {
202                 future.set( createSessionDownRpcResult() );
203             } else {
204                 future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, reason ) );
205             }
206         }
207
208         isSessionClosing.set(false);
209     }
210
211     private RpcResult<NetconfMessage> createSessionDownRpcResult() {
212         return createErrorRpcResult( RpcError.ErrorType.TRANSPORT,
213                              String.format( "The netconf session to %1$s is disconnected", id.getName() ) );
214     }
215
216     private RpcResult<NetconfMessage> createErrorRpcResult( RpcError.ErrorType errorType, String message ) {
217         return RpcResultBuilder.<NetconfMessage>failed()
218                 .withError(errorType, NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(), message).build();
219     }
220
221     @Override
222     public void onSessionDown(final NetconfClientSession session, final Exception e) {
223         // If session is already in closing, no need to call tearDown again.
224         if (isSessionClosing.compareAndSet(false, true)) {
225             LOG.warn("{}: Session went down", id, e);
226             tearDown( null );
227         }
228     }
229
230     @Override
231     public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
232         // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing.
233         LOG.warn("{}: Session terminated {}", id, reason);
234         tearDown( reason.getErrorMessage() );
235     }
236
237     @Override
238     public void close() {
239         // Cancel reconnect if in progress
240         if(initFuture != null) {
241             initFuture.cancel(false);
242         }
243         // Disconnect from device
244         // tear down not necessary, called indirectly by the close in disconnect()
245         disconnect();
246     }
247
248     @Override
249     public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
250         /*
251          * Dispatch between notifications and messages. Messages need to be processed
252          * with lock held, notifications do not.
253          */
254         if (isNotification(message)) {
255             processNotification(message);
256         } else {
257             processMessage(message);
258         }
259     }
260
261     private void processMessage(final NetconfMessage message) {
262         Request request = null;
263         sessionLock.lock();
264
265         try {
266             request = requests.peek();
267             if (request != null && request.future.isUncancellable()) {
268                 requests.poll();
269             } else {
270                 request = null;
271                 LOG.warn("{}: Ignoring unsolicited message {}", id,
272                         msgToS(message));
273             }
274         }
275         finally {
276             sessionLock.unlock();
277         }
278
279         if( request != null ) {
280
281             LOG.debug("{}: Message received {}", id, message);
282
283             if(LOG.isTraceEnabled()) {
284                 LOG.trace( "{}: Matched request: {} to response: {}", id, msgToS( request.request ), msgToS( message ) );
285             }
286
287             try {
288                 NetconfMessageTransformUtil.checkValidReply( request.request, message );
289             } catch (final NetconfDocumentedException e) {
290                 LOG.warn(
291                         "{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}",
292                         id, msgToS(request.request), msgToS(message), e);
293
294                 request.future.set( RpcResultBuilder.<NetconfMessage>failed()
295                         .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() );
296
297                 //recursively processing message to eventually find matching request
298                 processMessage(message);
299
300                 return;
301             }
302
303             try {
304                 NetconfMessageTransformUtil.checkSuccessReply(message);
305             } catch(final NetconfDocumentedException e) {
306                 LOG.warn(
307                         "{}: Error reply from remote device, request: {}, response: {}",
308                         id, msgToS(request.request), msgToS(message), e);
309
310                 request.future.set( RpcResultBuilder.<NetconfMessage>failed()
311                         .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() );
312                 return;
313             }
314
315             request.future.set( RpcResultBuilder.success( message ).build() );
316         }
317     }
318
319     private static String msgToS(final NetconfMessage msg) {
320         return XmlUtil.toString(msg.getDocument());
321     }
322
323     @Override
324     public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
325         sessionLock.lock();
326         try {
327             return sendRequestWithLock( message, rpc );
328         } finally {
329             sessionLock.unlock();
330         }
331     }
332
333     private ListenableFuture<RpcResult<NetconfMessage>> sendRequestWithLock(
334                                                final NetconfMessage message, final QName rpc) {
335         if(LOG.isTraceEnabled()) {
336             LOG.trace("{}: Sending message {}", id, msgToS(message));
337         }
338
339         if (session == null) {
340             LOG.warn("{}: Session is disconnected, failing RPC request {}",
341                     id, message);
342             return Futures.immediateFuture( createSessionDownRpcResult() );
343         }
344
345         final Request req = new Request( new UncancellableFuture<RpcResult<NetconfMessage>>(true),
346                                          message );
347         requests.add(req);
348
349         session.sendMessage(req.request).addListener(new FutureListener<Void>() {
350             @Override
351             public void operationComplete(final Future<Void> future) throws Exception {
352                 if( !future.isSuccess() ) {
353                     // We expect that a session down will occur at this point
354                     LOG.debug("{}: Failed to send request {}", id,
355                             XmlUtil.toString(req.request.getDocument()),
356                             future.cause());
357
358                     if( future.cause() != null ) {
359                         req.future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT,
360                                                               future.cause().getLocalizedMessage() ) );
361                     } else {
362                         req.future.set( createSessionDownRpcResult() ); // assume session is down
363                     }
364                     req.future.setException( future.cause() );
365                 }
366                 else {
367                     LOG.trace("Finished sending request {}", req.request);
368                 }
369             }
370         });
371
372         return req.future;
373     }
374
375     private void processNotification(final NetconfMessage notification) {
376         if(LOG.isTraceEnabled()) {
377             LOG.trace("{}: Notification received: {}", id, notification);
378         }
379
380         remoteDevice.onNotification(notification);
381     }
382
383     private static boolean isNotification(final NetconfMessage message) {
384         final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
385         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
386     }
387
388     private static final class Request {
389         final UncancellableFuture<RpcResult<NetconfMessage>> future;
390         final NetconfMessage request;
391
392         private Request(final UncancellableFuture<RpcResult<NetconfMessage>> future,
393                         final NetconfMessage request) {
394             this.future = future;
395             this.request = request;
396         }
397     }
398 }