3c90e6d78858cebe8092a661521164cc523caed5
[netconf.git] / plugins / netconf-client-mdsal / src / main / java / org / opendaylight / netconf / client / mdsal / NetconfDeviceCommunicator.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.client.mdsal;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Strings;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.io.EOFException;
17 import java.lang.invoke.MethodHandles;
18 import java.lang.invoke.VarHandle;
19 import java.util.ArrayDeque;
20 import java.util.ArrayList;
21 import java.util.Queue;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.locks.Lock;
24 import java.util.concurrent.locks.ReentrantLock;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.opendaylight.netconf.api.DocumentedException;
28 import org.opendaylight.netconf.api.NetconfTerminationReason;
29 import org.opendaylight.netconf.api.messages.NetconfMessage;
30 import org.opendaylight.netconf.api.messages.NotificationMessage;
31 import org.opendaylight.netconf.api.xml.XmlElement;
32 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
33 import org.opendaylight.netconf.api.xml.XmlUtil;
34 import org.opendaylight.netconf.client.NetconfClientSession;
35 import org.opendaylight.netconf.client.NetconfClientSessionListener;
36 import org.opendaylight.netconf.client.NetconfMessageUtil;
37 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
38 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceCommunicator;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
41 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
42 import org.opendaylight.yangtools.yang.common.ErrorTag;
43 import org.opendaylight.yangtools.yang.common.ErrorType;
44 import org.opendaylight.yangtools.yang.common.QName;
45 import org.opendaylight.yangtools.yang.common.RpcError;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator {
52     private record Request(
53             @NonNull UncancellableFuture<RpcResult<NetconfMessage>> future,
54             @NonNull NetconfMessage request) {
55         Request {
56             requireNonNull(future);
57             requireNonNull(request);
58         }
59     }
60
61     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
62     private static final VarHandle CLOSING;
63
64     static {
65         try {
66             CLOSING = MethodHandles.lookup().findVarHandle(NetconfDeviceCommunicator.class, "closing", boolean.class);
67         } catch (NoSuchFieldException | IllegalAccessException e) {
68             throw new ExceptionInInitializerError(e);
69         }
70     }
71
72     protected final RemoteDevice<NetconfDeviceCommunicator> remoteDevice;
73     private final @Nullable UserPreferences overrideNetconfCapabilities;
74     protected final RemoteDeviceId id;
75     private final Lock sessionLock = new ReentrantLock();
76
77     private final Semaphore semaphore;
78     private final int concurentRpcMsgs;
79
80     private final Queue<Request> requests = new ArrayDeque<>();
81     private NetconfClientSession currentSession;
82
83     // isSessionClosing indicates a close operation on the session is issued and tearDown will surely be called later
84     // to finish the close.
85     // Used to allow only one thread to enter tearDown and other threads should NOT enter it simultaneously and should
86     // end its close operation without calling tearDown to release the locks they hold to avoid deadlock.
87     //
88     // This field is manipulated using CLOSING VarHandle
89     @SuppressWarnings("unused")
90     private volatile boolean closing;
91
92     public boolean isSessionClosing() {
93         return (boolean) CLOSING.getVolatile(this);
94     }
95
96     public NetconfDeviceCommunicator(final RemoteDeviceId id,
97             final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit) {
98         this(id, remoteDevice, rpcMessageLimit, null);
99     }
100
101     public NetconfDeviceCommunicator(final RemoteDeviceId id,
102             final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit,
103             final @Nullable UserPreferences overrideNetconfCapabilities) {
104         concurentRpcMsgs = rpcMessageLimit;
105         this.id = id;
106         this.remoteDevice = remoteDevice;
107         this.overrideNetconfCapabilities = overrideNetconfCapabilities;
108         semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
109     }
110
111     @Override
112     public void onSessionUp(final NetconfClientSession session) {
113         sessionLock.lock();
114         try {
115             LOG.debug("{}: Session established", id);
116             currentSession = session;
117
118             var netconfSessionPreferences = NetconfSessionPreferences.fromNetconfSession(session);
119             LOG.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences);
120
121             final var localOverride = overrideNetconfCapabilities;
122             if (localOverride != null) {
123                 final var sessionPreferences = localOverride.sessionPreferences();
124                 netconfSessionPreferences = localOverride.overrideModuleCapabilities()
125                         ? netconfSessionPreferences.replaceModuleCaps(sessionPreferences)
126                         : netconfSessionPreferences.addModuleCaps(sessionPreferences);
127
128                 netconfSessionPreferences = localOverride.overrideNonModuleCapabilities()
129                         ? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences)
130                         : netconfSessionPreferences.addNonModuleCaps(sessionPreferences);
131                 LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id,
132                         netconfSessionPreferences);
133             }
134
135             remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
136         } finally {
137             sessionLock.unlock();
138         }
139     }
140
141     public void disconnect() {
142         // If session is already in closing, no need to close it again
143         if (currentSession != null && CLOSING.compareAndSet(this, false, true) && currentSession.isUp()) {
144             currentSession.close();
145         }
146     }
147
148     private void tearDown(final String reason) {
149         if (!isSessionClosing()) {
150             LOG.warn("It's curious that no one to close the session but tearDown is called!");
151         }
152         LOG.debug("Tearing down {}", reason);
153         final var futuresToCancel = new ArrayList<UncancellableFuture<RpcResult<NetconfMessage>>>();
154         sessionLock.lock();
155         try {
156             if (currentSession != null) {
157                 currentSession = null;
158                 /*
159                  * Walk all requests, check if they have been executing
160                  * or cancelled and remove them from the queue.
161                  */
162                 final var it = requests.iterator();
163                 while (it.hasNext()) {
164                     final var r = it.next();
165                     if (r.future.isUncancellable()) {
166                         futuresToCancel.add(r.future);
167                         it.remove();
168                     } else if (r.future.isCancelled()) {
169                         // This just does some house-cleaning
170                         it.remove();
171                     }
172                 }
173
174                 remoteDevice.onRemoteSessionDown();
175             }
176         } finally {
177             sessionLock.unlock();
178         }
179
180         // Notify pending request futures outside of the sessionLock to avoid unnecessarily blocking the caller.
181         for (var future : futuresToCancel) {
182             if (Strings.isNullOrEmpty(reason)) {
183                 future.set(createSessionDownRpcResult());
184             } else {
185                 future.set(createErrorRpcResult(ErrorType.TRANSPORT, reason));
186             }
187         }
188
189         CLOSING.setVolatile(this, false);
190     }
191
192     private RpcResult<NetconfMessage> createSessionDownRpcResult() {
193         return createErrorRpcResult(ErrorType.TRANSPORT,
194             "The netconf session to %1$s is disconnected".formatted(id.name()));
195     }
196
197     private static RpcResult<NetconfMessage> createErrorRpcResult(final ErrorType errorType, final String message) {
198         return RpcResultBuilder.<NetconfMessage>failed()
199             .withError(errorType, ErrorTag.OPERATION_FAILED, message)
200             .build();
201     }
202
203     @Override
204     public void onSessionDown(final NetconfClientSession session, final Exception exception) {
205         // If session is already in closing, no need to call tearDown again.
206         if (CLOSING.compareAndSet(this, false, true)) {
207             if (exception instanceof EOFException) {
208                 LOG.info("{}: Session went down: {}", id, exception.getMessage());
209             } else {
210                 LOG.warn("{}: Session went down", id, exception);
211             }
212             tearDown(null);
213         }
214     }
215
216     @Override
217     public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
218         // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing.
219         LOG.warn("{}: Session terminated {}", id, reason);
220         tearDown(reason.getErrorMessage());
221     }
222
223     @Override
224     public void close() {
225         // Disconnect from device
226         // tear down not necessary, called indirectly by the close in disconnect()
227         disconnect();
228     }
229
230     @Override
231     public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
232         // Dispatch between notifications and messages. Messages need to be processed with lock held, notifications do
233         // not.
234         if (NotificationMessage.ELEMENT_NAME.equals(XmlElement.fromDomDocument(message.getDocument()).getName())) {
235             if (LOG.isTraceEnabled()) {
236                 LOG.trace("{}: Notification received: {}", id, message);
237             }
238             remoteDevice.onNotification(message);
239         } else {
240             processMessage(message);
241         }
242     }
243
244     @Override
245     public void onError(final NetconfClientSession session, final Exception failure) {
246         final var request = pollRequest();
247         if (request != null) {
248             request.future.set(RpcResultBuilder.<NetconfMessage>failed()
249                 .withRpcError(toRpcError(new DocumentedException(failure.getMessage(),
250                     ErrorType.APPLICATION, ErrorTag.MALFORMED_MESSAGE, ErrorSeverity.ERROR)))
251                 .build());
252         } else {
253             LOG.warn("{}: Ignoring unsolicited failure {}", id, failure.toString());
254         }
255     }
256
257     private @Nullable Request pollRequest() {
258         sessionLock.lock();
259         try {
260             var request = requests.peek();
261             if (request != null && request.future.isUncancellable()) {
262                 request = requests.poll();
263                 // we have just removed one request from the queue
264                 // we can also release one permit
265                 if (semaphore != null) {
266                     semaphore.release();
267                 }
268                 return request;
269             }
270             return null;
271         } finally {
272             sessionLock.unlock();
273         }
274     }
275
276     private void processMessage(final NetconfMessage message) {
277         final var request = pollRequest();
278         if (request == null) {
279             // No matching request, bail out
280             if (LOG.isWarnEnabled()) {
281                 LOG.warn("{}: Ignoring unsolicited message {}", id, msgToS(message));
282             }
283             return;
284         }
285
286         LOG.debug("{}: Message received {}", id, message);
287         if (LOG.isTraceEnabled()) {
288             LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message));
289         }
290
291         final var inputMsgId = request.request.getDocument().getDocumentElement()
292             .getAttribute(XmlNetconfConstants.MESSAGE_ID);
293         final var outputMsgId = message.getDocument().getDocumentElement()
294             .getAttribute(XmlNetconfConstants.MESSAGE_ID);
295         if (!inputMsgId.equals(outputMsgId)) {
296             // FIXME: we should be able to transform directly to RpcError without an intermediate exception
297             final var ex = new DocumentedException("Response message contained unknown \"message-id\"", null,
298                 ErrorType.PROTOCOL, ErrorTag.BAD_ATTRIBUTE, ErrorSeverity.ERROR,
299                 ImmutableMap.of("actual-message-id", outputMsgId, "expected-message-id", inputMsgId));
300             if (LOG.isWarnEnabled()) {
301                 LOG.warn("{}: Invalid request-reply match, reply message contains different message-id, request: {}, "
302                     + "response: {}", id, msgToS(request.request), msgToS(message));
303             }
304             request.future.set(RpcResultBuilder.<NetconfMessage>failed().withRpcError(toRpcError(ex)).build());
305
306             // recursively processing message to eventually find matching request
307             processMessage(message);
308             return;
309         }
310
311         final RpcResult<NetconfMessage> result;
312         if (NetconfMessageUtil.isErrorMessage(message)) {
313             // FIXME: we should be able to transform directly to RpcError without an intermediate exception
314             final var ex = DocumentedException.fromXMLDocument(message.getDocument());
315             if (LOG.isWarnEnabled()) {
316                 LOG.warn("{}: Error reply from remote device, request: {}, response: {}", id, msgToS(request.request),
317                     msgToS(message));
318             }
319             result = RpcResultBuilder.<NetconfMessage>failed().withRpcError(toRpcError(ex)).build();
320         } else {
321             result = RpcResultBuilder.success(message).build();
322         }
323
324         request.future.set(result);
325     }
326
327     private static String msgToS(final NetconfMessage msg) {
328         return XmlUtil.toString(msg.getDocument());
329     }
330
331     private static RpcError toRpcError(final DocumentedException ex) {
332         final var errorInfo = ex.getErrorInfo();
333         final String infoString;
334         if (errorInfo != null) {
335             final var sb = new StringBuilder();
336             for (var e : errorInfo.entrySet()) {
337                 final var tag = e.getKey();
338                 sb.append('<').append(tag).append('>').append(e.getValue()).append("</").append(tag).append('>');
339             }
340             infoString = sb.toString();
341         } else {
342             infoString = "";
343         }
344
345         return ex.getErrorSeverity() == ErrorSeverity.ERROR
346             ? RpcResultBuilder.newError(ex.getErrorType(), ex.getErrorTag(), ex.getLocalizedMessage(), null,
347                 infoString, ex.getCause())
348             : RpcResultBuilder.newWarning(ex.getErrorType(), ex.getErrorTag(), ex.getLocalizedMessage(), null,
349                 infoString, ex.getCause());
350     }
351
352     @Override
353     public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
354         sessionLock.lock();
355         try {
356             if (semaphore != null && !semaphore.tryAcquire()) {
357                 LOG.warn("Limit of concurrent rpc messages was reached (limit: {}). Rpc reply message is needed. "
358                     + "Discarding request of Netconf device with id: {}", concurentRpcMsgs, id.name());
359                 return Futures.immediateFailedFuture(new DocumentedException(
360                         "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
361                         + ") waiting for emptying the queue of Netconf device with id: " + id.name()));
362             }
363
364             return sendRequestWithLock(message, rpc);
365         } finally {
366             sessionLock.unlock();
367         }
368     }
369
370     private ListenableFuture<RpcResult<NetconfMessage>> sendRequestWithLock(final NetconfMessage message,
371             final QName rpc) {
372         if (LOG.isTraceEnabled()) {
373             LOG.trace("{}: Sending message {}", id, msgToS(message));
374         }
375
376         if (currentSession == null) {
377             LOG.warn("{}: Session is disconnected, failing RPC request {}", id, message);
378             return Futures.immediateFuture(createSessionDownRpcResult());
379         }
380
381         final var req = new Request(new UncancellableFuture<>(true), message);
382         requests.add(req);
383
384         currentSession.sendMessage(req.request).addListener(future -> {
385             if (!future.isSuccess()) {
386                 // We expect that a session down will occur at this point
387                 final var cause = future.cause();
388                 if (LOG.isDebugEnabled()) {
389                     LOG.debug("{}: Failed to send request {}", id, XmlUtil.toString(req.request.getDocument()), cause);
390                 }
391
392                 final RpcResult<NetconfMessage> result;
393                 if (cause == null) {
394                     // assume session is down
395                     result = createSessionDownRpcResult();
396                 } else {
397                     result = createErrorRpcResult(ErrorType.TRANSPORT, cause.getLocalizedMessage());
398                 }
399                 req.future.set(result);
400             } else {
401                 LOG.trace("Finished sending request {}", req.request);
402             }
403         });
404
405         return req.future;
406     }
407 }