Bump upstreams
[netconf.git] / plugins / netconf-client-mdsal / src / main / java / org / opendaylight / netconf / client / mdsal / NetconfDeviceCommunicator.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.client.mdsal;
9
10 import com.google.common.base.Strings;
11 import com.google.common.collect.ImmutableMap;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import io.netty.util.concurrent.Future;
16 import java.io.EOFException;
17 import java.util.ArrayDeque;
18 import java.util.ArrayList;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Queue;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReentrantLock;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.opendaylight.netconf.api.DocumentedException;
28 import org.opendaylight.netconf.api.NetconfMessage;
29 import org.opendaylight.netconf.api.NetconfTerminationReason;
30 import org.opendaylight.netconf.api.xml.XmlElement;
31 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
32 import org.opendaylight.netconf.api.xml.XmlUtil;
33 import org.opendaylight.netconf.client.NetconfClientDispatcher;
34 import org.opendaylight.netconf.client.NetconfClientSession;
35 import org.opendaylight.netconf.client.NetconfClientSessionListener;
36 import org.opendaylight.netconf.client.NetconfMessageUtil;
37 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
38 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
39 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceCommunicator;
42 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
43 import org.opendaylight.yangtools.yang.common.Empty;
44 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
45 import org.opendaylight.yangtools.yang.common.ErrorTag;
46 import org.opendaylight.yangtools.yang.common.ErrorType;
47 import org.opendaylight.yangtools.yang.common.QName;
48 import org.opendaylight.yangtools.yang.common.RpcError;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator {
55     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
56
57     protected final RemoteDevice<NetconfDeviceCommunicator> remoteDevice;
58     private final @Nullable UserPreferences overrideNetconfCapabilities;
59     protected final RemoteDeviceId id;
60     private final Lock sessionLock = new ReentrantLock();
61
62     private final Semaphore semaphore;
63     private final int concurentRpcMsgs;
64
65     private final Queue<Request> requests = new ArrayDeque<>();
66     private NetconfClientSession currentSession;
67
68     private final SettableFuture<Empty> firstConnectionFuture = SettableFuture.create();
69     private Future<?> taskFuture;
70
71     // isSessionClosing indicates a close operation on the session is issued and
72     // tearDown will surely be called later to finish the close.
73     // Used to allow only one thread to enter tearDown and other threads should
74     // NOT enter it simultaneously and should end its close operation without
75     // calling tearDown to release the locks they hold to avoid deadlock.
76     private static final AtomicIntegerFieldUpdater<NetconfDeviceCommunicator> CLOSING_UPDATER =
77             AtomicIntegerFieldUpdater.newUpdater(NetconfDeviceCommunicator.class, "closing");
78     private volatile int closing;
79
80     public boolean isSessionClosing() {
81         return closing != 0;
82     }
83
84     public NetconfDeviceCommunicator(final RemoteDeviceId id,
85             final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit) {
86         this(id, remoteDevice, rpcMessageLimit, null);
87     }
88
89     public NetconfDeviceCommunicator(final RemoteDeviceId id,
90             final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit,
91             final @Nullable UserPreferences overrideNetconfCapabilities) {
92         concurentRpcMsgs = rpcMessageLimit;
93         this.id = id;
94         this.remoteDevice = remoteDevice;
95         this.overrideNetconfCapabilities = overrideNetconfCapabilities;
96         semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
97     }
98
99     @Override
100     public void onSessionUp(final NetconfClientSession session) {
101         sessionLock.lock();
102         try {
103             LOG.debug("{}: Session established", id);
104             currentSession = session;
105
106             var netconfSessionPreferences = NetconfSessionPreferences.fromNetconfSession(session);
107             LOG.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences);
108
109             final var localOverride = overrideNetconfCapabilities;
110             if (localOverride != null) {
111                 final var sessionPreferences = localOverride.sessionPreferences();
112                 netconfSessionPreferences = localOverride.overrideModuleCapabilities()
113                         ? netconfSessionPreferences.replaceModuleCaps(sessionPreferences)
114                         : netconfSessionPreferences.addModuleCaps(sessionPreferences);
115
116                 netconfSessionPreferences = localOverride.overrideNonModuleCapabilities()
117                         ? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences)
118                         : netconfSessionPreferences.addNonModuleCaps(sessionPreferences);
119                 LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id,
120                         netconfSessionPreferences);
121             }
122
123             remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
124         } finally {
125             sessionLock.unlock();
126         }
127
128         // FIXME: right, except ... this does not include the device schema setup, so is it really useful?
129         if (!firstConnectionFuture.set(Empty.value())) {
130             LOG.trace("{}: First connection already completed", id);
131         }
132     }
133
134     /**
135      * Initialize remote connection.
136      *
137      * @param dispatcher {@code NetconfCLientDispatcher}
138      * @param config     {@code NetconfClientConfiguration}
139      * @return a ListenableFuture that returns success on first successful connection and failure when the underlying
140      *         reconnecting strategy runs out of reconnection attempts
141      */
142     public ListenableFuture<Empty> initializeRemoteConnection(final NetconfClientDispatcher dispatcher,
143             final NetconfClientConfiguration config) {
144
145         final Future<?> connectFuture;
146         if (config instanceof NetconfReconnectingClientConfiguration) {
147             // FIXME: This is weird. If I understand it correctly we want to know about the first connection so as to
148             //        forward error state. Analyze the call graph to understand what is going on here. We really want
149             //        to move reconnection away from the socket layer, so that it can properly interface with sessions
150             //        and generally has some event-driven state (as all good network glue does). There is a second story
151             //        which is we want to avoid duplicate code, so it depends on other users as well.
152             final var future = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
153             taskFuture = future;
154             connectFuture = future.firstSessionFuture();
155         } else {
156             taskFuture = connectFuture = dispatcher.createClient(config);
157         }
158
159         connectFuture.addListener(future -> {
160             if (!future.isSuccess() && !future.isCancelled()) {
161                 LOG.debug("{}: Connection failed", id, future.cause());
162                 remoteDevice.onRemoteSessionFailed(future.cause());
163                 if (!firstConnectionFuture.isDone()) {
164                     firstConnectionFuture.setException(future.cause());
165                 }
166             }
167         });
168         return firstConnectionFuture;
169     }
170
171     public void disconnect() {
172         // If session is already in closing, no need to close it again
173         if (currentSession != null && startClosing() && currentSession.isUp()) {
174             currentSession.close();
175         }
176     }
177
178     private void tearDown(final String reason) {
179         if (!isSessionClosing()) {
180             LOG.warn("It's curious that no one to close the session but tearDown is called!");
181         }
182         LOG.debug("Tearing down {}", reason);
183         final List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = new ArrayList<>();
184         sessionLock.lock();
185         try {
186             if (currentSession != null) {
187                 currentSession = null;
188                 /*
189                  * Walk all requests, check if they have been executing
190                  * or cancelled and remove them from the queue.
191                  */
192                 final Iterator<Request> it = requests.iterator();
193                 while (it.hasNext()) {
194                     final Request r = it.next();
195                     if (r.future.isUncancellable()) {
196                         futuresToCancel.add(r.future);
197                         it.remove();
198                     } else if (r.future.isCancelled()) {
199                         // This just does some house-cleaning
200                         it.remove();
201                     }
202                 }
203
204                 remoteDevice.onRemoteSessionDown();
205             }
206         } finally {
207             sessionLock.unlock();
208         }
209
210         // Notify pending request futures outside of the sessionLock to avoid unnecessarily
211         // blocking the caller.
212         for (final UncancellableFuture<RpcResult<NetconfMessage>> future : futuresToCancel) {
213             if (Strings.isNullOrEmpty(reason)) {
214                 future.set(createSessionDownRpcResult());
215             } else {
216                 future.set(createErrorRpcResult(ErrorType.TRANSPORT, reason));
217             }
218         }
219
220         closing = 0;
221     }
222
223     private RpcResult<NetconfMessage> createSessionDownRpcResult() {
224         return createErrorRpcResult(ErrorType.TRANSPORT,
225                 String.format("The netconf session to %1$s is disconnected", id.name()));
226     }
227
228     private static RpcResult<NetconfMessage> createErrorRpcResult(final ErrorType errorType, final String message) {
229         return RpcResultBuilder.<NetconfMessage>failed()
230             .withError(errorType, ErrorTag.OPERATION_FAILED, message).build();
231     }
232
233     @Override
234     public void onSessionDown(final NetconfClientSession session, final Exception exception) {
235         // If session is already in closing, no need to call tearDown again.
236         if (startClosing()) {
237             if (exception instanceof EOFException) {
238                 LOG.info("{}: Session went down: {}", id, exception.getMessage());
239             } else {
240                 LOG.warn("{}: Session went down", id, exception);
241             }
242             tearDown(null);
243         }
244     }
245
246     @Override
247     public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
248         // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing.
249         LOG.warn("{}: Session terminated {}", id, reason);
250         tearDown(reason.getErrorMessage());
251     }
252
253     @Override
254     public void close() {
255         // Cancel reconnect if in progress
256         if (taskFuture != null) {
257             taskFuture.cancel(false);
258         }
259         // Disconnect from device
260         // tear down not necessary, called indirectly by the close in disconnect()
261         disconnect();
262     }
263
264     @Override
265     public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
266         /*
267          * Dispatch between notifications and messages. Messages need to be processed
268          * with lock held, notifications do not.
269          */
270         if (isNotification(message)) {
271             processNotification(message);
272         } else {
273             processMessage(message);
274         }
275     }
276
277     @Override
278     public void onError(final NetconfClientSession session, final Exception failure) {
279         final Request request = pollRequest();
280         if (request != null) {
281             request.future.set(RpcResultBuilder.<NetconfMessage>failed()
282                 .withRpcError(toRpcError(new DocumentedException(failure.getMessage(),
283                     ErrorType.APPLICATION, ErrorTag.MALFORMED_MESSAGE, ErrorSeverity.ERROR)))
284                 .build());
285         } else {
286             LOG.warn("{}: Ignoring unsolicited failure {}", id, failure.toString());
287         }
288     }
289
290     private @Nullable Request pollRequest() {
291         Request request;
292         sessionLock.lock();
293
294         try {
295             request = requests.peek();
296             if (request != null && request.future.isUncancellable()) {
297                 request = requests.poll();
298                 // we have just removed one request from the queue
299                 // we can also release one permit
300                 if (semaphore != null) {
301                     semaphore.release();
302                 }
303             } else {
304                 request = null;
305             }
306         } finally {
307             sessionLock.unlock();
308         }
309         return request;
310     }
311
312     private void processMessage(final NetconfMessage message) {
313         final Request request = pollRequest();
314         if (request == null) {
315             // No matching request, bail out
316             LOG.warn("{}: Ignoring unsolicited message {}", id, msgToS(message));
317             return;
318         }
319
320         LOG.debug("{}: Message received {}", id, message);
321
322         if (LOG.isTraceEnabled()) {
323             LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message));
324         }
325
326         final String inputMsgId = request.request.getDocument().getDocumentElement()
327             .getAttribute(XmlNetconfConstants.MESSAGE_ID);
328         final String outputMsgId = message.getDocument().getDocumentElement()
329             .getAttribute(XmlNetconfConstants.MESSAGE_ID);
330         if (!inputMsgId.equals(outputMsgId)) {
331             // FIXME: we should be able to transform directly to RpcError without an intermediate exception
332             final var ex = new DocumentedException("Response message contained unknown \"message-id\"", null,
333                 ErrorType.PROTOCOL, ErrorTag.BAD_ATTRIBUTE, ErrorSeverity.ERROR,
334                 ImmutableMap.of("actual-message-id", outputMsgId, "expected-message-id", inputMsgId));
335             LOG.warn("{}: Invalid request-reply match, reply message contains different message-id, "
336                 + "request: {}, response: {}", id, msgToS(request.request), msgToS(message));
337
338             request.future.set(RpcResultBuilder.<NetconfMessage>failed().withRpcError(toRpcError(ex)).build());
339
340             // recursively processing message to eventually find matching request
341             processMessage(message);
342             return;
343         }
344
345         if (NetconfMessageUtil.isErrorMessage(message)) {
346             // FIXME: we should be able to transform directly to RpcError without an intermediate exception
347             final var ex = DocumentedException.fromXMLDocument(message.getDocument());
348             LOG.warn("{}: Error reply from remote device, request: {}, response: {}",
349                 id, msgToS(request.request), msgToS(message));
350             request.future.set(RpcResultBuilder.<NetconfMessage>failed().withRpcError(toRpcError(ex)).build());
351             return;
352         }
353
354         request.future.set(RpcResultBuilder.success(message).build());
355     }
356
357     private static String msgToS(final NetconfMessage msg) {
358         return XmlUtil.toString(msg.getDocument());
359     }
360
361     private static RpcError toRpcError(final DocumentedException ex) {
362         final var errorInfo = ex.getErrorInfo();
363         final String infoString;
364         if (errorInfo != null) {
365             final var sb = new StringBuilder();
366             for (var e : errorInfo.entrySet()) {
367                 final var tag = e.getKey();
368                 sb.append('<').append(tag).append('>').append(e.getValue()).append("</").append(tag).append('>');
369             }
370             infoString = sb.toString();
371         } else {
372             infoString = "";
373         }
374
375         return ex.getErrorSeverity() == ErrorSeverity.ERROR
376             ? RpcResultBuilder.newError(ex.getErrorType(), ex.getErrorTag(), ex.getLocalizedMessage(), null,
377                 infoString, ex.getCause())
378             : RpcResultBuilder.newWarning(ex.getErrorType(), ex.getErrorTag(), ex.getLocalizedMessage(), null,
379                 infoString, ex.getCause());
380     }
381
382     @Override
383     public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
384         sessionLock.lock();
385         try {
386             if (semaphore != null && !semaphore.tryAcquire()) {
387                 LOG.warn("Limit of concurrent rpc messages was reached (limit: {}). Rpc reply message is needed. "
388                     + "Discarding request of Netconf device with id: {}", concurentRpcMsgs, id.name());
389                 return Futures.immediateFailedFuture(new DocumentedException(
390                         "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
391                         + ") waiting for emptying the queue of Netconf device with id: " + id.name()));
392             }
393
394             return sendRequestWithLock(message, rpc);
395         } finally {
396             sessionLock.unlock();
397         }
398     }
399
400     private ListenableFuture<RpcResult<NetconfMessage>> sendRequestWithLock(final NetconfMessage message,
401                                                                             final QName rpc) {
402         if (LOG.isTraceEnabled()) {
403             LOG.trace("{}: Sending message {}", id, msgToS(message));
404         }
405
406         if (currentSession == null) {
407             LOG.warn("{}: Session is disconnected, failing RPC request {}",
408                     id, message);
409             return Futures.immediateFuture(createSessionDownRpcResult());
410         }
411
412         final Request req = new Request(new UncancellableFuture<>(true), message);
413         requests.add(req);
414
415         currentSession.sendMessage(req.request).addListener(future -> {
416             if (!future.isSuccess()) {
417                 // We expect that a session down will occur at this point
418                 LOG.debug("{}: Failed to send request {}", id,
419                         XmlUtil.toString(req.request.getDocument()),
420                         future.cause());
421
422                 if (future.cause() != null) {
423                     req.future.set(createErrorRpcResult(ErrorType.TRANSPORT, future.cause().getLocalizedMessage()));
424                 } else {
425                     req.future.set(createSessionDownRpcResult()); // assume session is down
426                 }
427                 req.future.setException(future.cause());
428             } else {
429                 LOG.trace("Finished sending request {}", req.request);
430             }
431         });
432
433         return req.future;
434     }
435
436     private void processNotification(final NetconfMessage notification) {
437         if (LOG.isTraceEnabled()) {
438             LOG.trace("{}: Notification received: {}", id, notification);
439         }
440
441         remoteDevice.onNotification(notification);
442     }
443
444     private static boolean isNotification(final NetconfMessage message) {
445         if (message.getDocument() == null) {
446             // We have no message, which mean we have a FailedNetconfMessage
447             return false;
448         }
449         final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
450         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
451     }
452
453     private static final class Request {
454         final UncancellableFuture<RpcResult<NetconfMessage>> future;
455         final NetconfMessage request;
456
457         private Request(final UncancellableFuture<RpcResult<NetconfMessage>> future,
458                         final NetconfMessage request) {
459             this.future = future;
460             this.request = request;
461         }
462     }
463
464     private boolean startClosing() {
465         return CLOSING_UPDATER.compareAndSet(this, 0, 1);
466     }
467 }