2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.client.mdsal;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.Strings;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.io.EOFException;
17 import java.lang.invoke.MethodHandles;
18 import java.lang.invoke.VarHandle;
19 import java.util.ArrayDeque;
20 import java.util.ArrayList;
21 import java.util.Queue;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.locks.Lock;
24 import java.util.concurrent.locks.ReentrantLock;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.opendaylight.netconf.api.DocumentedException;
28 import org.opendaylight.netconf.api.NetconfTerminationReason;
29 import org.opendaylight.netconf.api.messages.NetconfMessage;
30 import org.opendaylight.netconf.api.messages.NotificationMessage;
31 import org.opendaylight.netconf.api.xml.XmlElement;
32 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
33 import org.opendaylight.netconf.api.xml.XmlUtil;
34 import org.opendaylight.netconf.client.NetconfClientSession;
35 import org.opendaylight.netconf.client.NetconfClientSessionListener;
36 import org.opendaylight.netconf.client.NetconfMessageUtil;
37 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
38 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceCommunicator;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
41 import org.opendaylight.yangtools.yang.common.ErrorSeverity;
42 import org.opendaylight.yangtools.yang.common.ErrorTag;
43 import org.opendaylight.yangtools.yang.common.ErrorType;
44 import org.opendaylight.yangtools.yang.common.QName;
45 import org.opendaylight.yangtools.yang.common.RpcError;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator {
52 private record Request(
53 @NonNull UncancellableFuture<RpcResult<NetconfMessage>> future,
54 @NonNull NetconfMessage request) {
56 requireNonNull(future);
57 requireNonNull(request);
61 private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
62 private static final VarHandle CLOSING;
66 CLOSING = MethodHandles.lookup().findVarHandle(NetconfDeviceCommunicator.class, "closing", boolean.class);
67 } catch (NoSuchFieldException | IllegalAccessException e) {
68 throw new ExceptionInInitializerError(e);
72 protected final RemoteDevice<NetconfDeviceCommunicator> remoteDevice;
73 private final @Nullable UserPreferences overrideNetconfCapabilities;
74 protected final RemoteDeviceId id;
75 private final Lock sessionLock = new ReentrantLock();
77 private final Semaphore semaphore;
78 private final int concurentRpcMsgs;
80 private final Queue<Request> requests = new ArrayDeque<>();
81 private NetconfClientSession currentSession;
83 // isSessionClosing indicates a close operation on the session is issued and tearDown will surely be called later
84 // to finish the close.
85 // Used to allow only one thread to enter tearDown and other threads should NOT enter it simultaneously and should
86 // end its close operation without calling tearDown to release the locks they hold to avoid deadlock.
88 // This field is manipulated using CLOSING VarHandle
89 @SuppressWarnings("unused")
90 private volatile boolean closing;
92 public boolean isSessionClosing() {
93 return (boolean) CLOSING.getVolatile(this);
96 public NetconfDeviceCommunicator(final RemoteDeviceId id,
97 final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit) {
98 this(id, remoteDevice, rpcMessageLimit, null);
101 public NetconfDeviceCommunicator(final RemoteDeviceId id,
102 final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit,
103 final @Nullable UserPreferences overrideNetconfCapabilities) {
104 concurentRpcMsgs = rpcMessageLimit;
106 this.remoteDevice = remoteDevice;
107 this.overrideNetconfCapabilities = overrideNetconfCapabilities;
108 semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
112 public void onSessionUp(final NetconfClientSession session) {
115 LOG.debug("{}: Session established", id);
116 currentSession = session;
118 var netconfSessionPreferences = NetconfSessionPreferences.fromNetconfSession(session);
119 LOG.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences);
121 final var localOverride = overrideNetconfCapabilities;
122 if (localOverride != null) {
123 final var sessionPreferences = localOverride.sessionPreferences();
124 netconfSessionPreferences = localOverride.overrideModuleCapabilities()
125 ? netconfSessionPreferences.replaceModuleCaps(sessionPreferences)
126 : netconfSessionPreferences.addModuleCaps(sessionPreferences);
128 netconfSessionPreferences = localOverride.overrideNonModuleCapabilities()
129 ? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences)
130 : netconfSessionPreferences.addNonModuleCaps(sessionPreferences);
131 LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id,
132 netconfSessionPreferences);
135 remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
137 sessionLock.unlock();
141 public void disconnect() {
142 // If session is already in closing, no need to close it again
143 if (currentSession != null && CLOSING.compareAndSet(this, false, true) && currentSession.isUp()) {
144 currentSession.close();
148 private void tearDown(final String reason) {
149 if (!isSessionClosing()) {
150 LOG.warn("It's curious that no one to close the session but tearDown is called!");
152 LOG.debug("Tearing down {}", reason);
153 final var futuresToCancel = new ArrayList<UncancellableFuture<RpcResult<NetconfMessage>>>();
156 if (currentSession != null) {
157 currentSession = null;
159 * Walk all requests, check if they have been executing
160 * or cancelled and remove them from the queue.
162 final var it = requests.iterator();
163 while (it.hasNext()) {
164 final var r = it.next();
165 if (r.future.isUncancellable()) {
166 futuresToCancel.add(r.future);
168 } else if (r.future.isCancelled()) {
169 // This just does some house-cleaning
174 remoteDevice.onRemoteSessionDown();
177 sessionLock.unlock();
180 // Notify pending request futures outside of the sessionLock to avoid unnecessarily blocking the caller.
181 for (var future : futuresToCancel) {
182 if (Strings.isNullOrEmpty(reason)) {
183 future.set(createSessionDownRpcResult());
185 future.set(createErrorRpcResult(ErrorType.TRANSPORT, reason));
189 CLOSING.setVolatile(this, false);
192 private RpcResult<NetconfMessage> createSessionDownRpcResult() {
193 return createErrorRpcResult(ErrorType.TRANSPORT,
194 "The netconf session to %1$s is disconnected".formatted(id.name()));
197 private static RpcResult<NetconfMessage> createErrorRpcResult(final ErrorType errorType, final String message) {
198 return RpcResultBuilder.<NetconfMessage>failed()
199 .withError(errorType, ErrorTag.OPERATION_FAILED, message)
204 public void onSessionDown(final NetconfClientSession session, final Exception exception) {
205 // If session is already in closing, no need to call tearDown again.
206 if (CLOSING.compareAndSet(this, false, true)) {
207 if (exception instanceof EOFException) {
208 LOG.info("{}: Session went down: {}", id, exception.getMessage());
210 LOG.warn("{}: Session went down", id, exception);
217 public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
218 // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing.
219 LOG.warn("{}: Session terminated {}", id, reason);
220 tearDown(reason.getErrorMessage());
224 public void close() {
225 // Disconnect from device
226 // tear down not necessary, called indirectly by the close in disconnect()
231 public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
232 // Dispatch between notifications and messages. Messages need to be processed with lock held, notifications do
234 if (NotificationMessage.ELEMENT_NAME.equals(XmlElement.fromDomDocument(message.getDocument()).getName())) {
235 if (LOG.isTraceEnabled()) {
236 LOG.trace("{}: Notification received: {}", id, message);
238 remoteDevice.onNotification(message);
240 processMessage(message);
245 public void onError(final NetconfClientSession session, final Exception failure) {
246 final var request = pollRequest();
247 if (request != null) {
248 request.future.set(RpcResultBuilder.<NetconfMessage>failed()
249 .withRpcError(toRpcError(new DocumentedException(failure.getMessage(),
250 ErrorType.APPLICATION, ErrorTag.MALFORMED_MESSAGE, ErrorSeverity.ERROR)))
253 LOG.warn("{}: Ignoring unsolicited failure {}", id, failure.toString());
257 private @Nullable Request pollRequest() {
260 var request = requests.peek();
261 if (request != null && request.future.isUncancellable()) {
262 request = requests.poll();
263 // we have just removed one request from the queue
264 // we can also release one permit
265 if (semaphore != null) {
272 sessionLock.unlock();
276 private void processMessage(final NetconfMessage message) {
277 final var request = pollRequest();
278 if (request == null) {
279 // No matching request, bail out
280 if (LOG.isWarnEnabled()) {
281 LOG.warn("{}: Ignoring unsolicited message {}", id, msgToS(message));
286 LOG.debug("{}: Message received {}", id, message);
287 if (LOG.isTraceEnabled()) {
288 LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message));
291 final var inputMsgId = request.request.getDocument().getDocumentElement()
292 .getAttribute(XmlNetconfConstants.MESSAGE_ID);
293 final var outputMsgId = message.getDocument().getDocumentElement()
294 .getAttribute(XmlNetconfConstants.MESSAGE_ID);
295 if (!inputMsgId.equals(outputMsgId)) {
296 // FIXME: we should be able to transform directly to RpcError without an intermediate exception
297 final var ex = new DocumentedException("Response message contained unknown \"message-id\"", null,
298 ErrorType.PROTOCOL, ErrorTag.BAD_ATTRIBUTE, ErrorSeverity.ERROR,
299 ImmutableMap.of("actual-message-id", outputMsgId, "expected-message-id", inputMsgId));
300 if (LOG.isWarnEnabled()) {
301 LOG.warn("{}: Invalid request-reply match, reply message contains different message-id, request: {}, "
302 + "response: {}", id, msgToS(request.request), msgToS(message));
304 request.future.set(RpcResultBuilder.<NetconfMessage>failed().withRpcError(toRpcError(ex)).build());
306 // recursively processing message to eventually find matching request
307 processMessage(message);
311 final RpcResult<NetconfMessage> result;
312 if (NetconfMessageUtil.isErrorMessage(message)) {
313 // FIXME: we should be able to transform directly to RpcError without an intermediate exception
314 final var ex = DocumentedException.fromXMLDocument(message.getDocument());
315 if (LOG.isWarnEnabled()) {
316 LOG.warn("{}: Error reply from remote device, request: {}, response: {}", id, msgToS(request.request),
319 result = RpcResultBuilder.<NetconfMessage>failed().withRpcError(toRpcError(ex)).build();
321 result = RpcResultBuilder.success(message).build();
324 request.future.set(result);
327 private static String msgToS(final NetconfMessage msg) {
328 return XmlUtil.toString(msg.getDocument());
331 private static RpcError toRpcError(final DocumentedException ex) {
332 final var errorInfo = ex.getErrorInfo();
333 final String infoString;
334 if (errorInfo != null) {
335 final var sb = new StringBuilder();
336 for (var e : errorInfo.entrySet()) {
337 final var tag = e.getKey();
338 sb.append('<').append(tag).append('>').append(e.getValue()).append("</").append(tag).append('>');
340 infoString = sb.toString();
345 return ex.getErrorSeverity() == ErrorSeverity.ERROR
346 ? RpcResultBuilder.newError(ex.getErrorType(), ex.getErrorTag(), ex.getLocalizedMessage(), null,
347 infoString, ex.getCause())
348 : RpcResultBuilder.newWarning(ex.getErrorType(), ex.getErrorTag(), ex.getLocalizedMessage(), null,
349 infoString, ex.getCause());
353 public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
356 if (semaphore != null && !semaphore.tryAcquire()) {
357 LOG.warn("Limit of concurrent rpc messages was reached (limit: {}). Rpc reply message is needed. "
358 + "Discarding request of Netconf device with id: {}", concurentRpcMsgs, id.name());
359 return Futures.immediateFailedFuture(new DocumentedException(
360 "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
361 + ") waiting for emptying the queue of Netconf device with id: " + id.name()));
364 return sendRequestWithLock(message, rpc);
366 sessionLock.unlock();
370 private ListenableFuture<RpcResult<NetconfMessage>> sendRequestWithLock(final NetconfMessage message,
372 if (LOG.isTraceEnabled()) {
373 LOG.trace("{}: Sending message {}", id, msgToS(message));
376 if (currentSession == null) {
377 LOG.warn("{}: Session is disconnected, failing RPC request {}", id, message);
378 return Futures.immediateFuture(createSessionDownRpcResult());
381 final var req = new Request(new UncancellableFuture<>(true), message);
384 currentSession.sendMessage(req.request).addListener(future -> {
385 if (!future.isSuccess()) {
386 // We expect that a session down will occur at this point
387 final var cause = future.cause();
388 if (LOG.isDebugEnabled()) {
389 LOG.debug("{}: Failed to send request {}", id, XmlUtil.toString(req.request.getDocument()), cause);
392 final RpcResult<NetconfMessage> result;
394 // assume session is down
395 result = createSessionDownRpcResult();
397 result = createErrorRpcResult(ErrorType.TRANSPORT, cause.getLocalizedMessage());
399 req.future.set(result);
401 LOG.trace("Finished sending request {}", req.request);