Bump upstreams
[netconf.git] / plugins / netconf-client-mdsal / src / main / java / org / opendaylight / netconf / client / mdsal / NetconfDeviceCommunicator.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.client.mdsal;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Strings;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.io.EOFException;
18 import java.lang.invoke.MethodHandles;
19 import java.lang.invoke.VarHandle;
20 import java.util.ArrayDeque;
21 import java.util.ArrayList;
22 import java.util.Queue;
23 import java.util.concurrent.Semaphore;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReentrantLock;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.opendaylight.netconf.api.DocumentedException;
29 import org.opendaylight.netconf.api.NetconfTerminationReason;
30 import org.opendaylight.netconf.api.messages.NetconfMessage;
31 import org.opendaylight.netconf.api.messages.NotificationMessage;
32 import org.opendaylight.netconf.api.xml.XmlElement;
33 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
34 import org.opendaylight.netconf.api.xml.XmlUtil;
35 import org.opendaylight.netconf.client.NetconfClientSession;
36 import org.opendaylight.netconf.client.NetconfClientSessionListener;
37 import org.opendaylight.netconf.client.NetconfMessageUtil;
38 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceCommunicator;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
42 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
43 import org.opendaylight.yangtools.yang.common.ErrorTag;
44 import org.opendaylight.yangtools.yang.common.ErrorType;
45 import org.opendaylight.yangtools.yang.common.QName;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator {
53     private record Request(
54             @NonNull UncancellableFuture<RpcResult<NetconfMessage>> future,
55             @NonNull NetconfMessage request) {
56         Request {
57             requireNonNull(future);
58             requireNonNull(request);
59         }
60     }
61
62     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
63     private static final VarHandle CLOSING;
64
65     static {
66         try {
67             CLOSING = MethodHandles.lookup().findVarHandle(NetconfDeviceCommunicator.class, "closing", boolean.class);
68         } catch (NoSuchFieldException | IllegalAccessException e) {
69             throw new ExceptionInInitializerError(e);
70         }
71     }
72
73     protected final RemoteDevice<NetconfDeviceCommunicator> remoteDevice;
74     private final @Nullable UserPreferences overrideNetconfCapabilities;
75     protected final RemoteDeviceId id;
76     private final Lock sessionLock = new ReentrantLock();
77
78     private final Semaphore semaphore;
79     private final int concurentRpcMsgs;
80
81     private final Queue<Request> requests = new ArrayDeque<>();
82     private NetconfClientSession currentSession;
83
84     // isSessionClosing indicates a close operation on the session is issued and tearDown will surely be called later
85     // to finish the close.
86     // Used to allow only one thread to enter tearDown and other threads should NOT enter it simultaneously and should
87     // end its close operation without calling tearDown to release the locks they hold to avoid deadlock.
88     //
89     // This field is manipulated using CLOSING VarHandle
90     @SuppressWarnings("unused")
91     @SuppressFBWarnings(value = "UUF_UNUSED_FIELD", justification = "https://github.com/spotbugs/spotbugs/issues/2749")
92     private volatile boolean closing;
93
94     public boolean isSessionClosing() {
95         return (boolean) CLOSING.getVolatile(this);
96     }
97
98     public NetconfDeviceCommunicator(final RemoteDeviceId id,
99             final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit) {
100         this(id, remoteDevice, rpcMessageLimit, null);
101     }
102
103     public NetconfDeviceCommunicator(final RemoteDeviceId id,
104             final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit,
105             final @Nullable UserPreferences overrideNetconfCapabilities) {
106         concurentRpcMsgs = rpcMessageLimit;
107         this.id = id;
108         this.remoteDevice = remoteDevice;
109         this.overrideNetconfCapabilities = overrideNetconfCapabilities;
110         semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
111     }
112
113     @Override
114     public void onSessionUp(final NetconfClientSession session) {
115         sessionLock.lock();
116         try {
117             LOG.debug("{}: Session established", id);
118             currentSession = session;
119
120             var netconfSessionPreferences = NetconfSessionPreferences.fromNetconfSession(session);
121             LOG.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences);
122
123             final var localOverride = overrideNetconfCapabilities;
124             if (localOverride != null) {
125                 final var sessionPreferences = localOverride.sessionPreferences();
126                 netconfSessionPreferences = localOverride.overrideModuleCapabilities()
127                         ? netconfSessionPreferences.replaceModuleCaps(sessionPreferences)
128                         : netconfSessionPreferences.addModuleCaps(sessionPreferences);
129
130                 netconfSessionPreferences = localOverride.overrideNonModuleCapabilities()
131                         ? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences)
132                         : netconfSessionPreferences.addNonModuleCaps(sessionPreferences);
133                 LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id,
134                         netconfSessionPreferences);
135             }
136
137             remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
138         } finally {
139             sessionLock.unlock();
140         }
141     }
142
143     public void disconnect() {
144         // If session is already in closing, no need to close it again
145         if (currentSession != null && CLOSING.compareAndSet(this, false, true) && currentSession.isUp()) {
146             currentSession.close();
147         }
148     }
149
150     private void tearDown(final String reason) {
151         if (!isSessionClosing()) {
152             LOG.warn("It's curious that no one to close the session but tearDown is called!");
153         }
154         LOG.debug("Tearing down {}", reason);
155         final var futuresToCancel = new ArrayList<UncancellableFuture<RpcResult<NetconfMessage>>>();
156         sessionLock.lock();
157         try {
158             if (currentSession != null) {
159                 currentSession = null;
160                 /*
161                  * Walk all requests, check if they have been executing
162                  * or cancelled and remove them from the queue.
163                  */
164                 final var it = requests.iterator();
165                 while (it.hasNext()) {
166                     final var r = it.next();
167                     if (r.future.isUncancellable()) {
168                         futuresToCancel.add(r.future);
169                         it.remove();
170                     } else if (r.future.isCancelled()) {
171                         // This just does some house-cleaning
172                         it.remove();
173                     }
174                 }
175
176                 remoteDevice.onRemoteSessionDown();
177             }
178         } finally {
179             sessionLock.unlock();
180         }
181
182         // Notify pending request futures outside of the sessionLock to avoid unnecessarily blocking the caller.
183         for (var future : futuresToCancel) {
184             if (Strings.isNullOrEmpty(reason)) {
185                 future.set(createSessionDownRpcResult());
186             } else {
187                 future.set(createErrorRpcResult(ErrorType.TRANSPORT, reason));
188             }
189         }
190
191         CLOSING.setVolatile(this, false);
192     }
193
194     private RpcResult<NetconfMessage> createSessionDownRpcResult() {
195         return createErrorRpcResult(ErrorType.TRANSPORT,
196             "The netconf session to %1$s is disconnected".formatted(id.name()));
197     }
198
199     private static RpcResult<NetconfMessage> createErrorRpcResult(final ErrorType errorType, final String message) {
200         return RpcResultBuilder.<NetconfMessage>failed()
201             .withError(errorType, ErrorTag.OPERATION_FAILED, message)
202             .build();
203     }
204
205     @Override
206     public void onSessionDown(final NetconfClientSession session, final Exception exception) {
207         // If session is already in closing, no need to call tearDown again.
208         if (CLOSING.compareAndSet(this, false, true)) {
209             if (exception instanceof EOFException) {
210                 LOG.info("{}: Session went down: {}", id, exception.getMessage());
211             } else {
212                 LOG.warn("{}: Session went down", id, exception);
213             }
214             tearDown(null);
215         }
216     }
217
218     @Override
219     public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
220         // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing.
221         LOG.warn("{}: Session terminated {}", id, reason);
222         tearDown(reason.getErrorMessage());
223     }
224
225     @Override
226     public void close() {
227         // Disconnect from device
228         // tear down not necessary, called indirectly by the close in disconnect()
229         disconnect();
230     }
231
232     @Override
233     public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
234         // Dispatch between notifications and messages. Messages need to be processed with lock held, notifications do
235         // not.
236         if (NotificationMessage.ELEMENT_NAME.equals(XmlElement.fromDomDocument(message.getDocument()).getName())) {
237             if (LOG.isTraceEnabled()) {
238                 LOG.trace("{}: Notification received: {}", id, message);
239             }
240             remoteDevice.onNotification(message);
241         } else {
242             processMessage(message);
243         }
244     }
245
246     @Override
247     public void onError(final NetconfClientSession session, final Exception failure) {
248         final var request = pollRequest();
249         if (request != null) {
250             request.future.set(RpcResultBuilder.<NetconfMessage>failed()
251                 .withRpcError(toRpcError(new DocumentedException(failure.getMessage(),
252                     ErrorType.APPLICATION, ErrorTag.MALFORMED_MESSAGE, ErrorSeverity.ERROR)))
253                 .build());
254         } else {
255             LOG.warn("{}: Ignoring unsolicited failure {}", id, failure.toString());
256         }
257     }
258
259     private @Nullable Request pollRequest() {
260         sessionLock.lock();
261         try {
262             var request = requests.peek();
263             if (request != null && request.future.isUncancellable()) {
264                 request = requests.poll();
265                 // we have just removed one request from the queue
266                 // we can also release one permit
267                 if (semaphore != null) {
268                     semaphore.release();
269                 }
270                 return request;
271             }
272             return null;
273         } finally {
274             sessionLock.unlock();
275         }
276     }
277
278     private void processMessage(final NetconfMessage message) {
279         final var request = pollRequest();
280         if (request == null) {
281             // No matching request, bail out
282             if (LOG.isWarnEnabled()) {
283                 LOG.warn("{}: Ignoring unsolicited message {}", id, msgToS(message));
284             }
285             return;
286         }
287
288         LOG.debug("{}: Message received {}", id, message);
289         if (LOG.isTraceEnabled()) {
290             LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message));
291         }
292
293         final var inputMsgId = request.request.getDocument().getDocumentElement()
294             .getAttribute(XmlNetconfConstants.MESSAGE_ID);
295         final var outputMsgId = message.getDocument().getDocumentElement()
296             .getAttribute(XmlNetconfConstants.MESSAGE_ID);
297         if (!inputMsgId.equals(outputMsgId)) {
298             // FIXME: we should be able to transform directly to RpcError without an intermediate exception
299             final var ex = new DocumentedException("Response message contained unknown \"message-id\"", null,
300                 ErrorType.PROTOCOL, ErrorTag.BAD_ATTRIBUTE, ErrorSeverity.ERROR,
301                 ImmutableMap.of("actual-message-id", outputMsgId, "expected-message-id", inputMsgId));
302             if (LOG.isWarnEnabled()) {
303                 LOG.warn("{}: Invalid request-reply match, reply message contains different message-id, request: {}, "
304                     + "response: {}", id, msgToS(request.request), msgToS(message));
305             }
306             request.future.set(RpcResultBuilder.<NetconfMessage>failed().withRpcError(toRpcError(ex)).build());
307
308             // recursively processing message to eventually find matching request
309             processMessage(message);
310             return;
311         }
312
313         final RpcResult<NetconfMessage> result;
314         if (NetconfMessageUtil.isErrorMessage(message)) {
315             // FIXME: we should be able to transform directly to RpcError without an intermediate exception
316             final var ex = DocumentedException.fromXMLDocument(message.getDocument());
317             if (LOG.isWarnEnabled()) {
318                 LOG.warn("{}: Error reply from remote device, request: {}, response: {}", id, msgToS(request.request),
319                     msgToS(message));
320             }
321             result = RpcResultBuilder.<NetconfMessage>failed().withRpcError(toRpcError(ex)).build();
322         } else {
323             result = RpcResultBuilder.success(message).build();
324         }
325
326         request.future.set(result);
327     }
328
329     private static String msgToS(final NetconfMessage msg) {
330         return XmlUtil.toString(msg.getDocument());
331     }
332
333     private static RpcError toRpcError(final DocumentedException ex) {
334         final var errorInfo = ex.getErrorInfo();
335         final String infoString;
336         if (errorInfo != null) {
337             final var sb = new StringBuilder();
338             for (var e : errorInfo.entrySet()) {
339                 final var tag = e.getKey();
340                 sb.append('<').append(tag).append('>').append(e.getValue()).append("</").append(tag).append('>');
341             }
342             infoString = sb.toString();
343         } else {
344             infoString = "";
345         }
346
347         return ex.getErrorSeverity() == ErrorSeverity.ERROR
348             ? RpcResultBuilder.newError(ex.getErrorType(), ex.getErrorTag(), ex.getLocalizedMessage(), null,
349                 infoString, ex.getCause())
350             : RpcResultBuilder.newWarning(ex.getErrorType(), ex.getErrorTag(), ex.getLocalizedMessage(), null,
351                 infoString, ex.getCause());
352     }
353
354     @Override
355     public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
356         sessionLock.lock();
357         try {
358             if (semaphore != null && !semaphore.tryAcquire()) {
359                 LOG.warn("Limit of concurrent rpc messages was reached (limit: {}). Rpc reply message is needed. "
360                     + "Discarding request of Netconf device with id: {}", concurentRpcMsgs, id.name());
361                 return Futures.immediateFailedFuture(new DocumentedException(
362                         "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
363                         + ") waiting for emptying the queue of Netconf device with id: " + id.name()));
364             }
365
366             return sendRequestWithLock(message, rpc);
367         } finally {
368             sessionLock.unlock();
369         }
370     }
371
372     private ListenableFuture<RpcResult<NetconfMessage>> sendRequestWithLock(final NetconfMessage message,
373             final QName rpc) {
374         if (LOG.isTraceEnabled()) {
375             LOG.trace("{}: Sending message {}", id, msgToS(message));
376         }
377
378         if (currentSession == null) {
379             LOG.warn("{}: Session is disconnected, failing RPC request {}", id, message);
380             return Futures.immediateFuture(createSessionDownRpcResult());
381         }
382
383         final var req = new Request(new UncancellableFuture<>(true), message);
384         requests.add(req);
385
386         currentSession.sendMessage(req.request).addListener(future -> {
387             if (!future.isSuccess()) {
388                 // We expect that a session down will occur at this point
389                 final var cause = future.cause();
390                 if (LOG.isDebugEnabled()) {
391                     LOG.debug("{}: Failed to send request {}", id, XmlUtil.toString(req.request.getDocument()), cause);
392                 }
393
394                 final RpcResult<NetconfMessage> result;
395                 if (cause == null) {
396                     // assume session is down
397                     result = createSessionDownRpcResult();
398                 } else {
399                     result = createErrorRpcResult(ErrorType.TRANSPORT, cause.getLocalizedMessage());
400                 }
401                 req.future.set(result);
402             } else {
403                 LOG.trace("Finished sending request {}", req.request);
404             }
405         });
406
407         return req.future;
408     }
409 }