2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.client.mdsal;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.Strings;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.io.EOFException;
18 import java.lang.invoke.MethodHandles;
19 import java.lang.invoke.VarHandle;
20 import java.util.ArrayDeque;
21 import java.util.ArrayList;
22 import java.util.Queue;
23 import java.util.concurrent.Semaphore;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReentrantLock;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.opendaylight.netconf.api.DocumentedException;
29 import org.opendaylight.netconf.api.NetconfTerminationReason;
30 import org.opendaylight.netconf.api.messages.NetconfMessage;
31 import org.opendaylight.netconf.api.messages.NotificationMessage;
32 import org.opendaylight.netconf.api.xml.XmlElement;
33 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
34 import org.opendaylight.netconf.api.xml.XmlUtil;
35 import org.opendaylight.netconf.client.NetconfClientSession;
36 import org.opendaylight.netconf.client.NetconfClientSessionListener;
37 import org.opendaylight.netconf.client.NetconfMessageUtil;
38 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceCommunicator;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
42 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
43 import org.opendaylight.yangtools.yang.common.ErrorTag;
44 import org.opendaylight.yangtools.yang.common.ErrorType;
45 import org.opendaylight.yangtools.yang.common.QName;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator {
53 private record Request(
54 @NonNull UncancellableFuture<RpcResult<NetconfMessage>> future,
55 @NonNull NetconfMessage request) {
57 requireNonNull(future);
58 requireNonNull(request);
62 private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
63 private static final VarHandle CLOSING;
67 CLOSING = MethodHandles.lookup().findVarHandle(NetconfDeviceCommunicator.class, "closing", boolean.class);
68 } catch (NoSuchFieldException | IllegalAccessException e) {
69 throw new ExceptionInInitializerError(e);
73 protected final RemoteDevice<NetconfDeviceCommunicator> remoteDevice;
74 private final @Nullable UserPreferences overrideNetconfCapabilities;
75 protected final RemoteDeviceId id;
76 private final Lock sessionLock = new ReentrantLock();
78 private final Semaphore semaphore;
79 private final int concurentRpcMsgs;
81 private final Queue<Request> requests = new ArrayDeque<>();
82 private NetconfClientSession currentSession;
84 // isSessionClosing indicates a close operation on the session is issued and tearDown will surely be called later
85 // to finish the close.
86 // Used to allow only one thread to enter tearDown and other threads should NOT enter it simultaneously and should
87 // end its close operation without calling tearDown to release the locks they hold to avoid deadlock.
89 // This field is manipulated using CLOSING VarHandle
90 @SuppressWarnings("unused")
91 @SuppressFBWarnings(value = "UUF_UNUSED_FIELD", justification = "https://github.com/spotbugs/spotbugs/issues/2749")
92 private volatile boolean closing;
94 public boolean isSessionClosing() {
95 return (boolean) CLOSING.getVolatile(this);
98 public NetconfDeviceCommunicator(final RemoteDeviceId id,
99 final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit) {
100 this(id, remoteDevice, rpcMessageLimit, null);
103 public NetconfDeviceCommunicator(final RemoteDeviceId id,
104 final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit,
105 final @Nullable UserPreferences overrideNetconfCapabilities) {
106 concurentRpcMsgs = rpcMessageLimit;
108 this.remoteDevice = remoteDevice;
109 this.overrideNetconfCapabilities = overrideNetconfCapabilities;
110 semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
114 public void onSessionUp(final NetconfClientSession session) {
117 LOG.debug("{}: Session established", id);
118 currentSession = session;
120 var netconfSessionPreferences = NetconfSessionPreferences.fromNetconfSession(session);
121 LOG.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences);
123 final var localOverride = overrideNetconfCapabilities;
124 if (localOverride != null) {
125 final var sessionPreferences = localOverride.sessionPreferences();
126 netconfSessionPreferences = localOverride.overrideModuleCapabilities()
127 ? netconfSessionPreferences.replaceModuleCaps(sessionPreferences)
128 : netconfSessionPreferences.addModuleCaps(sessionPreferences);
130 netconfSessionPreferences = localOverride.overrideNonModuleCapabilities()
131 ? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences)
132 : netconfSessionPreferences.addNonModuleCaps(sessionPreferences);
133 LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id,
134 netconfSessionPreferences);
137 remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
139 sessionLock.unlock();
143 public void disconnect() {
144 // If session is already in closing, no need to close it again
145 if (currentSession != null && CLOSING.compareAndSet(this, false, true) && currentSession.isUp()) {
146 currentSession.close();
150 private void tearDown(final String reason) {
151 if (!isSessionClosing()) {
152 LOG.warn("It's curious that no one to close the session but tearDown is called!");
154 LOG.debug("Tearing down {}", reason);
155 final var futuresToCancel = new ArrayList<UncancellableFuture<RpcResult<NetconfMessage>>>();
158 if (currentSession != null) {
159 currentSession = null;
161 * Walk all requests, check if they have been executing
162 * or cancelled and remove them from the queue.
164 final var it = requests.iterator();
165 while (it.hasNext()) {
166 final var r = it.next();
167 if (r.future.isUncancellable()) {
168 futuresToCancel.add(r.future);
170 } else if (r.future.isCancelled()) {
171 // This just does some house-cleaning
176 remoteDevice.onRemoteSessionDown();
179 sessionLock.unlock();
182 // Notify pending request futures outside of the sessionLock to avoid unnecessarily blocking the caller.
183 for (var future : futuresToCancel) {
184 if (Strings.isNullOrEmpty(reason)) {
185 future.set(createSessionDownRpcResult());
187 future.set(createErrorRpcResult(ErrorType.TRANSPORT, reason));
191 CLOSING.setVolatile(this, false);
194 private RpcResult<NetconfMessage> createSessionDownRpcResult() {
195 return createErrorRpcResult(ErrorType.TRANSPORT,
196 "The netconf session to %1$s is disconnected".formatted(id.name()));
199 private static RpcResult<NetconfMessage> createErrorRpcResult(final ErrorType errorType, final String message) {
200 return RpcResultBuilder.<NetconfMessage>failed()
201 .withError(errorType, ErrorTag.OPERATION_FAILED, message)
206 public void onSessionDown(final NetconfClientSession session, final Exception exception) {
207 // If session is already in closing, no need to call tearDown again.
208 if (CLOSING.compareAndSet(this, false, true)) {
209 if (exception instanceof EOFException) {
210 LOG.info("{}: Session went down: {}", id, exception.getMessage());
212 LOG.warn("{}: Session went down", id, exception);
219 public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
220 // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing.
221 LOG.warn("{}: Session terminated {}", id, reason);
222 tearDown(reason.getErrorMessage());
226 public void close() {
227 // Disconnect from device
228 // tear down not necessary, called indirectly by the close in disconnect()
233 public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
234 // Dispatch between notifications and messages. Messages need to be processed with lock held, notifications do
236 if (NotificationMessage.ELEMENT_NAME.equals(XmlElement.fromDomDocument(message.getDocument()).getName())) {
237 if (LOG.isTraceEnabled()) {
238 LOG.trace("{}: Notification received: {}", id, message);
240 remoteDevice.onNotification(message);
242 processMessage(message);
247 public void onError(final NetconfClientSession session, final Exception failure) {
248 final var request = pollRequest();
249 if (request != null) {
250 request.future.set(RpcResultBuilder.<NetconfMessage>failed()
251 .withRpcError(toRpcError(new DocumentedException(failure.getMessage(),
252 ErrorType.APPLICATION, ErrorTag.MALFORMED_MESSAGE, ErrorSeverity.ERROR)))
255 LOG.warn("{}: Ignoring unsolicited failure {}", id, failure.toString());
259 private @Nullable Request pollRequest() {
262 var request = requests.peek();
263 if (request != null && request.future.isUncancellable()) {
264 request = requests.poll();
265 // we have just removed one request from the queue
266 // we can also release one permit
267 if (semaphore != null) {
274 sessionLock.unlock();
278 private void processMessage(final NetconfMessage message) {
279 final var request = pollRequest();
280 if (request == null) {
281 // No matching request, bail out
282 if (LOG.isWarnEnabled()) {
283 LOG.warn("{}: Ignoring unsolicited message {}", id, msgToS(message));
288 LOG.debug("{}: Message received {}", id, message);
289 if (LOG.isTraceEnabled()) {
290 LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message));
293 final var inputMsgId = request.request.getDocument().getDocumentElement()
294 .getAttribute(XmlNetconfConstants.MESSAGE_ID);
295 final var outputMsgId = message.getDocument().getDocumentElement()
296 .getAttribute(XmlNetconfConstants.MESSAGE_ID);
297 if (!inputMsgId.equals(outputMsgId)) {
298 // FIXME: we should be able to transform directly to RpcError without an intermediate exception
299 final var ex = new DocumentedException("Response message contained unknown \"message-id\"", null,
300 ErrorType.PROTOCOL, ErrorTag.BAD_ATTRIBUTE, ErrorSeverity.ERROR,
301 ImmutableMap.of("actual-message-id", outputMsgId, "expected-message-id", inputMsgId));
302 if (LOG.isWarnEnabled()) {
303 LOG.warn("{}: Invalid request-reply match, reply message contains different message-id, request: {}, "
304 + "response: {}", id, msgToS(request.request), msgToS(message));
306 request.future.set(RpcResultBuilder.<NetconfMessage>failed().withRpcError(toRpcError(ex)).build());
308 // recursively processing message to eventually find matching request
309 processMessage(message);
313 final RpcResult<NetconfMessage> result;
314 if (NetconfMessageUtil.isErrorMessage(message)) {
315 // FIXME: we should be able to transform directly to RpcError without an intermediate exception
316 final var ex = DocumentedException.fromXMLDocument(message.getDocument());
317 if (LOG.isWarnEnabled()) {
318 LOG.warn("{}: Error reply from remote device, request: {}, response: {}", id, msgToS(request.request),
321 result = RpcResultBuilder.<NetconfMessage>failed().withRpcError(toRpcError(ex)).build();
323 result = RpcResultBuilder.success(message).build();
326 request.future.set(result);
329 private static String msgToS(final NetconfMessage msg) {
330 return XmlUtil.toString(msg.getDocument());
333 private static RpcError toRpcError(final DocumentedException ex) {
334 final var errorInfo = ex.getErrorInfo();
335 final String infoString;
336 if (errorInfo != null) {
337 final var sb = new StringBuilder();
338 for (var e : errorInfo.entrySet()) {
339 final var tag = e.getKey();
340 sb.append('<').append(tag).append('>').append(e.getValue()).append("</").append(tag).append('>');
342 infoString = sb.toString();
347 return ex.getErrorSeverity() == ErrorSeverity.ERROR
348 ? RpcResultBuilder.newError(ex.getErrorType(), ex.getErrorTag(), ex.getLocalizedMessage(), null,
349 infoString, ex.getCause())
350 : RpcResultBuilder.newWarning(ex.getErrorType(), ex.getErrorTag(), ex.getLocalizedMessage(), null,
351 infoString, ex.getCause());
355 public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
358 if (semaphore != null && !semaphore.tryAcquire()) {
359 LOG.warn("Limit of concurrent rpc messages was reached (limit: {}). Rpc reply message is needed. "
360 + "Discarding request of Netconf device with id: {}", concurentRpcMsgs, id.name());
361 return Futures.immediateFailedFuture(new DocumentedException(
362 "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
363 + ") waiting for emptying the queue of Netconf device with id: " + id.name()));
366 return sendRequestWithLock(message, rpc);
368 sessionLock.unlock();
372 private ListenableFuture<RpcResult<NetconfMessage>> sendRequestWithLock(final NetconfMessage message,
374 if (LOG.isTraceEnabled()) {
375 LOG.trace("{}: Sending message {}", id, msgToS(message));
378 if (currentSession == null) {
379 LOG.warn("{}: Session is disconnected, failing RPC request {}", id, message);
380 return Futures.immediateFuture(createSessionDownRpcResult());
383 final var req = new Request(new UncancellableFuture<>(true), message);
386 currentSession.sendMessage(req.request).addListener(future -> {
387 if (!future.isSuccess()) {
388 // We expect that a session down will occur at this point
389 final var cause = future.cause();
390 if (LOG.isDebugEnabled()) {
391 LOG.debug("{}: Failed to send request {}", id, XmlUtil.toString(req.request.getDocument()), cause);
394 final RpcResult<NetconfMessage> result;
396 // assume session is down
397 result = createSessionDownRpcResult();
399 result = createErrorRpcResult(ErrorType.TRANSPORT, cause.getLocalizedMessage());
401 req.future.set(result);
403 LOG.trace("Finished sending request {}", req.request);