Remove redundant code constructs
[netconf.git] / netconf / sal-netconf-connector / src / main / java / org / opendaylight / netconf / sal / connect / netconf / listener / NetconfDeviceCommunicator.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.connect.netconf.listener;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Strings;
12 import com.google.common.collect.Lists;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.util.concurrent.Future;
17 import java.util.ArrayDeque;
18 import java.util.Iterator;
19 import java.util.List;
20 import java.util.Queue;
21 import java.util.concurrent.Semaphore;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.concurrent.locks.Lock;
24 import java.util.concurrent.locks.ReentrantLock;
25 import org.opendaylight.netconf.api.FailedNetconfMessage;
26 import org.opendaylight.netconf.api.NetconfDocumentedException;
27 import org.opendaylight.netconf.api.NetconfMessage;
28 import org.opendaylight.netconf.api.NetconfTerminationReason;
29 import org.opendaylight.netconf.api.xml.XmlElement;
30 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
31 import org.opendaylight.netconf.api.xml.XmlUtil;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSession;
34 import org.opendaylight.netconf.client.NetconfClientSessionListener;
35 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
36 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
37 import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
38 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
39 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
40 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
41 import org.opendaylight.yangtools.yang.common.QName;
42 import org.opendaylight.yangtools.yang.common.RpcError;
43 import org.opendaylight.yangtools.yang.common.RpcResult;
44 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class NetconfDeviceCommunicator
49         implements NetconfClientSessionListener, RemoteDeviceCommunicator<NetconfMessage> {
50
51     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
52
53     protected final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
54     private final Optional<UserPreferences> overrideNetconfCapabilities;
55     protected final RemoteDeviceId id;
56     private final Lock sessionLock = new ReentrantLock();
57
58     private final Semaphore semaphore;
59     private final int concurentRpcMsgs;
60
61     private final Queue<Request> requests = new ArrayDeque<>();
62     private NetconfClientSession currentSession;
63
64     private Future<?> initFuture;
65     private final SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
66
67     // isSessionClosing indicates a close operation on the session is issued and
68     // tearDown will surely be called later to finish the close.
69     // Used to allow only one thread to enter tearDown and other threads should
70     // NOT enter it simultaneously and should end its close operation without
71     // calling tearDown to release the locks they hold to avoid deadlock.
72     private final AtomicBoolean isSessionClosing = new AtomicBoolean(false);
73
74     public Boolean isSessionClosing() {
75         return isSessionClosing.get();
76     }
77
78     public NetconfDeviceCommunicator(
79             final RemoteDeviceId id,
80             final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
81             final UserPreferences netconfSessionPreferences, final int rpcMessageLimit) {
82         this(id, remoteDevice, Optional.of(netconfSessionPreferences), rpcMessageLimit);
83     }
84
85     public NetconfDeviceCommunicator(
86             final RemoteDeviceId id,
87             final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
88             final int rpcMessageLimit) {
89         this(id, remoteDevice, Optional.absent(), rpcMessageLimit);
90     }
91
92     private NetconfDeviceCommunicator(
93             final RemoteDeviceId id,
94             final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
95             final Optional<UserPreferences> overrideNetconfCapabilities, final int rpcMessageLimit) {
96         this.concurentRpcMsgs = rpcMessageLimit;
97         this.id = id;
98         this.remoteDevice = remoteDevice;
99         this.overrideNetconfCapabilities = overrideNetconfCapabilities;
100         this.firstConnectionFuture = SettableFuture.create();
101         this.semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
102     }
103
104     @Override
105     public void onSessionUp(final NetconfClientSession session) {
106         sessionLock.lock();
107         try {
108             LOG.debug("{}: Session established", id);
109             currentSession = session;
110
111             NetconfSessionPreferences netconfSessionPreferences =
112                                              NetconfSessionPreferences.fromNetconfSession(session);
113             LOG.trace("{}: Session advertised capabilities: {}", id,
114                     netconfSessionPreferences);
115
116             if (overrideNetconfCapabilities.isPresent()) {
117                 final NetconfSessionPreferences sessionPreferences = overrideNetconfCapabilities
118                         .get().getSessionPreferences();
119                 netconfSessionPreferences = overrideNetconfCapabilities.get().moduleBasedCapsOverrided()
120                         ? netconfSessionPreferences.replaceModuleCaps(sessionPreferences)
121                         : netconfSessionPreferences.addModuleCaps(sessionPreferences);
122
123                 netconfSessionPreferences = overrideNetconfCapabilities.get().nonModuleBasedCapsOverrided()
124                         ? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences)
125                         : netconfSessionPreferences.addNonModuleCaps(sessionPreferences);
126                 LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id,
127                         netconfSessionPreferences);
128             }
129
130             remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
131             if (!firstConnectionFuture.isDone()) {
132                 firstConnectionFuture.set(netconfSessionPreferences.getNetconfDeviceCapabilities());
133             }
134         } finally {
135             sessionLock.unlock();
136         }
137     }
138
139     /**
140      * Initialize remote connection.
141      *
142      * @param dispatcher {@code NetconfCLientDispatcher}
143      * @param config     {@code NetconfClientConfiguration}
144      * @return future that returns succes on first succesfull connection and failure when the underlying
145      *     reconnecting strategy runs out of reconnection attempts
146      */
147     public ListenableFuture<NetconfDeviceCapabilities> initializeRemoteConnection(
148             final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
149         if (config instanceof NetconfReconnectingClientConfiguration) {
150             initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
151         } else {
152             initFuture = dispatcher.createClient(config);
153         }
154
155
156         initFuture.addListener(future -> {
157             if (!future.isSuccess() && !future.isCancelled()) {
158                 LOG.debug("{}: Connection failed", id, future.cause());
159                 NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause());
160                 if (firstConnectionFuture.isDone()) {
161                     firstConnectionFuture.setException(future.cause());
162                 }
163             }
164         });
165         return firstConnectionFuture;
166     }
167
168     public void disconnect() {
169         // If session is already in closing, no need to close it again
170         if (currentSession != null && isSessionClosing.compareAndSet(false, true)) {
171             currentSession.close();
172         }
173     }
174
175     private void tearDown(final String reason) {
176         if (!isSessionClosing()) {
177             LOG.warn("It's curious that no one to close the session but tearDown is called!");
178         }
179         LOG.debug("Tearing down {}", reason);
180         final List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = Lists.newArrayList();
181         sessionLock.lock();
182         try {
183             if (currentSession != null) {
184                 currentSession = null;
185                 /*
186                  * Walk all requests, check if they have been executing
187                  * or cancelled and remove them from the queue.
188                  */
189                 final Iterator<Request> it = requests.iterator();
190                 while (it.hasNext()) {
191                     final Request r = it.next();
192                     if (r.future.isUncancellable()) {
193                         futuresToCancel.add(r.future);
194                         it.remove();
195                     } else if (r.future.isCancelled()) {
196                         // This just does some house-cleaning
197                         it.remove();
198                     }
199                 }
200
201                 remoteDevice.onRemoteSessionDown();
202             }
203         } finally {
204             sessionLock.unlock();
205         }
206
207         // Notify pending request futures outside of the sessionLock to avoid unnecessarily
208         // blocking the caller.
209         for (final UncancellableFuture<RpcResult<NetconfMessage>> future : futuresToCancel) {
210             if (Strings.isNullOrEmpty(reason)) {
211                 future.set(createSessionDownRpcResult());
212             } else {
213                 future.set(createErrorRpcResult(RpcError.ErrorType.TRANSPORT, reason));
214             }
215         }
216
217         isSessionClosing.set(false);
218     }
219
220     private RpcResult<NetconfMessage> createSessionDownRpcResult() {
221         return createErrorRpcResult(RpcError.ErrorType.TRANSPORT,
222                 String.format("The netconf session to %1$s is disconnected", id.getName()));
223     }
224
225     private static RpcResult<NetconfMessage> createErrorRpcResult(final RpcError.ErrorType errorType,
226             final String message) {
227         return RpcResultBuilder.<NetconfMessage>failed()
228             .withError(errorType, NetconfDocumentedException.ErrorTag.OPERATION_FAILED.getTagValue(), message).build();
229     }
230
231     @Override
232     public void onSessionDown(final NetconfClientSession session, final Exception exception) {
233         // If session is already in closing, no need to call tearDown again.
234         if (isSessionClosing.compareAndSet(false, true)) {
235             LOG.warn("{}: Session went down", id, exception);
236             tearDown(null);
237         }
238     }
239
240     @Override
241     public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
242         // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing.
243         LOG.warn("{}: Session terminated {}", id, reason);
244         tearDown(reason.getErrorMessage());
245     }
246
247     @Override
248     public void close() {
249         // Cancel reconnect if in progress
250         if (initFuture != null) {
251             initFuture.cancel(false);
252         }
253         // Disconnect from device
254         // tear down not necessary, called indirectly by the close in disconnect()
255         disconnect();
256     }
257
258     @Override
259     public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
260         /*
261          * Dispatch between notifications and messages. Messages need to be processed
262          * with lock held, notifications do not.
263          */
264         if (isNotification(message)) {
265             processNotification(message);
266         } else {
267             processMessage(message);
268         }
269     }
270
271     private void processMessage(final NetconfMessage message) {
272         Request request = null;
273         sessionLock.lock();
274
275         try {
276             request = requests.peek();
277             if (request != null && request.future.isUncancellable()) {
278                 request = requests.poll();
279                 // we have just removed one request from the queue
280                 // we can also release one permit
281                 if (semaphore != null) {
282                     semaphore.release();
283                 }
284             } else {
285                 request = null;
286                 LOG.warn("{}: Ignoring unsolicited message {}", id,
287                         msgToS(message));
288             }
289         } finally {
290             sessionLock.unlock();
291         }
292
293         if (request != null) {
294
295             if (FailedNetconfMessage.class.isInstance(message)) {
296                 request.future.set(NetconfMessageTransformUtil.toRpcResult((FailedNetconfMessage) message));
297                 return;
298             }
299
300             LOG.debug("{}: Message received {}", id, message);
301
302             if (LOG.isTraceEnabled()) {
303                 LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message));
304             }
305
306             try {
307                 NetconfMessageTransformUtil.checkValidReply(request.request, message);
308             } catch (final NetconfDocumentedException e) {
309                 LOG.warn(
310                         "{}: Invalid request-reply match,"
311                                 + "reply message contains different message-id, request: {}, response: {}",
312                         id, msgToS(request.request), msgToS(message), e);
313
314                 request.future.set(RpcResultBuilder.<NetconfMessage>failed()
315                         .withRpcError(NetconfMessageTransformUtil.toRpcError(e)).build());
316
317                 //recursively processing message to eventually find matching request
318                 processMessage(message);
319
320                 return;
321             }
322
323             try {
324                 NetconfMessageTransformUtil.checkSuccessReply(message);
325             } catch (final NetconfDocumentedException e) {
326                 LOG.warn(
327                         "{}: Error reply from remote device, request: {}, response: {}",
328                         id, msgToS(request.request), msgToS(message), e);
329
330                 request.future.set(RpcResultBuilder.<NetconfMessage>failed()
331                         .withRpcError(NetconfMessageTransformUtil.toRpcError(e)).build());
332                 return;
333             }
334
335             request.future.set(RpcResultBuilder.success(message).build());
336         }
337     }
338
339     private static String msgToS(final NetconfMessage msg) {
340         return XmlUtil.toString(msg.getDocument());
341     }
342
343     @Override
344     public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
345         sessionLock.lock();
346         try {
347             if (semaphore != null && !semaphore.tryAcquire()) {
348                 LOG.warn("Limit of concurrent rpc messages was reached (limit :" + concurentRpcMsgs
349                     + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName());
350                 return Futures.immediateFailedFuture(new NetconfDocumentedException(
351                         "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
352                         + ") waiting for emptying the queue of Netconf device with id" + id.getName()));
353             }
354
355             return sendRequestWithLock(message, rpc);
356         } finally {
357             sessionLock.unlock();
358         }
359     }
360
361     private ListenableFuture<RpcResult<NetconfMessage>> sendRequestWithLock(final NetconfMessage message,
362                                                                             final QName rpc) {
363         if (LOG.isTraceEnabled()) {
364             LOG.trace("{}: Sending message {}", id, msgToS(message));
365         }
366
367         if (currentSession == null) {
368             LOG.warn("{}: Session is disconnected, failing RPC request {}",
369                     id, message);
370             return Futures.immediateFuture(createSessionDownRpcResult());
371         }
372
373         final Request req = new Request(new UncancellableFuture<>(true), message);
374         requests.add(req);
375
376         currentSession.sendMessage(req.request).addListener(future -> {
377             if (!future.isSuccess()) {
378                 // We expect that a session down will occur at this point
379                 LOG.debug("{}: Failed to send request {}", id,
380                         XmlUtil.toString(req.request.getDocument()),
381                         future.cause());
382
383                 if (future.cause() != null) {
384                     req.future.set(createErrorRpcResult(RpcError.ErrorType.TRANSPORT,
385                             future.cause().getLocalizedMessage()));
386                 } else {
387                     req.future.set(createSessionDownRpcResult()); // assume session is down
388                 }
389                 req.future.setException(future.cause());
390             } else {
391                 LOG.trace("Finished sending request {}", req.request);
392             }
393         });
394
395         return req.future;
396     }
397
398     private void processNotification(final NetconfMessage notification) {
399         if (LOG.isTraceEnabled()) {
400             LOG.trace("{}: Notification received: {}", id, notification);
401         }
402
403         remoteDevice.onNotification(notification);
404     }
405
406     private static boolean isNotification(final NetconfMessage message) {
407         if (message.getDocument() == null) {
408             // We have no message, which mean we have a FailedNetconfMessage
409             return false;
410         }
411         final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
412         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
413     }
414
415     private static final class Request {
416         final UncancellableFuture<RpcResult<NetconfMessage>> future;
417         final NetconfMessage request;
418
419         private Request(final UncancellableFuture<RpcResult<NetconfMessage>> future,
420                         final NetconfMessage request) {
421             this.future = future;
422             this.request = request;
423         }
424     }
425 }