X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Fconnect%2Fnetconf%2Flistener%2FNetconfDeviceCommunicator.java;h=2b6e8aaa89699d15f55da9565759589ee2361ed4;hb=1dba5b2651d7fc60af0263cc0640d5ebeda8e454;hp=b55b42f669418683cdbabff42ee349b96cbc6ac6;hpb=6d7e12bf3ef64e5004703a1d540e7e26f30a9595;p=netconf.git diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java index b55b42f669..2b6e8aaa89 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java @@ -7,28 +7,28 @@ */ package org.opendaylight.netconf.sal.connect.netconf.listener; -import com.google.common.base.Optional; import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.netty.util.concurrent.Future; +import java.io.EOFException; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.opendaylight.controller.config.util.xml.XmlElement; -import org.opendaylight.controller.config.util.xml.XmlUtil; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.netconf.api.FailedNetconfMessage; import org.opendaylight.netconf.api.NetconfDocumentedException; import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfTerminationReason; +import org.opendaylight.netconf.api.xml.XmlElement; import org.opendaylight.netconf.api.xml.XmlNetconfConstants; +import org.opendaylight.netconf.api.xml.XmlUtil; import org.opendaylight.netconf.client.NetconfClientDispatcher; import org.opendaylight.netconf.client.NetconfClientSession; import org.opendaylight.netconf.client.NetconfClientSessionListener; @@ -36,22 +36,23 @@ import org.opendaylight.netconf.client.conf.NetconfClientConfiguration; import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration; import org.opendaylight.netconf.sal.connect.api.RemoteDevice; import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator; +import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil; -import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; +import org.opendaylight.yangtools.yang.common.Empty; +import org.opendaylight.yangtools.yang.common.ErrorTag; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NetconfDeviceCommunicator - implements NetconfClientSessionListener, RemoteDeviceCommunicator { - +public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator { private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class); - protected final RemoteDevice remoteDevice; - private final Optional overrideNetconfCapabilities; + protected final RemoteDevice remoteDevice; + private final @Nullable UserPreferences overrideNetconfCapabilities; protected final RemoteDeviceId id; private final Lock sessionLock = new ReentrantLock(); @@ -59,46 +60,37 @@ public class NetconfDeviceCommunicator private final int concurentRpcMsgs; private final Queue requests = new ArrayDeque<>(); - private NetconfClientSession session; + private NetconfClientSession currentSession; - private Future initFuture; - private final SettableFuture firstConnectionFuture; + private final SettableFuture firstConnectionFuture = SettableFuture.create(); + private Future taskFuture; // isSessionClosing indicates a close operation on the session is issued and // tearDown will surely be called later to finish the close. // Used to allow only one thread to enter tearDown and other threads should // NOT enter it simultaneously and should end its close operation without // calling tearDown to release the locks they hold to avoid deadlock. - private final AtomicBoolean isSessionClosing = new AtomicBoolean(false); - - public Boolean isSessionClosing() { - return isSessionClosing.get(); - } + private static final AtomicIntegerFieldUpdater CLOSING_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(NetconfDeviceCommunicator.class, "closing"); + private volatile int closing; - public NetconfDeviceCommunicator( - final RemoteDeviceId id, - final RemoteDevice remoteDevice, - final UserPreferences netconfSessionPreferences, final int rpcMessageLimit) { - this(id, remoteDevice, Optional.of(netconfSessionPreferences), rpcMessageLimit); + public boolean isSessionClosing() { + return closing != 0; } - public NetconfDeviceCommunicator( - final RemoteDeviceId id, - final RemoteDevice remoteDevice, - final int rpcMessageLimit) { - this(id, remoteDevice, Optional.absent(), rpcMessageLimit); + public NetconfDeviceCommunicator(final RemoteDeviceId id, + final RemoteDevice remoteDevice, final int rpcMessageLimit) { + this(id, remoteDevice, rpcMessageLimit, null); } - private NetconfDeviceCommunicator( - final RemoteDeviceId id, - final RemoteDevice remoteDevice, - final Optional overrideNetconfCapabilities, final int rpcMessageLimit) { - this.concurentRpcMsgs = rpcMessageLimit; + public NetconfDeviceCommunicator(final RemoteDeviceId id, + final RemoteDevice remoteDevice, final int rpcMessageLimit, + final @Nullable UserPreferences overrideNetconfCapabilities) { + concurentRpcMsgs = rpcMessageLimit; this.id = id; this.remoteDevice = remoteDevice; this.overrideNetconfCapabilities = overrideNetconfCapabilities; - this.firstConnectionFuture = SettableFuture.create(); - this.semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null; + semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null; } @Override @@ -106,21 +98,19 @@ public class NetconfDeviceCommunicator sessionLock.lock(); try { LOG.debug("{}: Session established", id); - this.session = session; + currentSession = session; - NetconfSessionPreferences netconfSessionPreferences = - NetconfSessionPreferences.fromNetconfSession(session); - LOG.trace("{}: Session advertised capabilities: {}", id, - netconfSessionPreferences); + var netconfSessionPreferences = NetconfSessionPreferences.fromNetconfSession(session); + LOG.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences); - if (overrideNetconfCapabilities.isPresent()) { - final NetconfSessionPreferences sessionPreferences = overrideNetconfCapabilities - .get().getSessionPreferences(); - netconfSessionPreferences = overrideNetconfCapabilities.get().moduleBasedCapsOverrided() + final var localOverride = overrideNetconfCapabilities; + if (localOverride != null) { + final var sessionPreferences = localOverride.sessionPreferences(); + netconfSessionPreferences = localOverride.overrideModuleCapabilities() ? netconfSessionPreferences.replaceModuleCaps(sessionPreferences) : netconfSessionPreferences.addModuleCaps(sessionPreferences); - netconfSessionPreferences = overrideNetconfCapabilities.get().nonModuleBasedCapsOverrided() + netconfSessionPreferences = localOverride.overrideNonModuleCapabilities() ? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences) : netconfSessionPreferences.addNonModuleCaps(sessionPreferences); LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, @@ -128,12 +118,14 @@ public class NetconfDeviceCommunicator } remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this); - if (!firstConnectionFuture.isDone()) { - firstConnectionFuture.set(netconfSessionPreferences.getNetconfDeviceCapabilities()); - } } finally { sessionLock.unlock(); } + + // FIXME: right, except ... this does not include the device schema setup, so is it really useful? + if (!firstConnectionFuture.set(Empty.value())) { + LOG.trace("{}: First connection already completed", id); + } } /** @@ -141,23 +133,31 @@ public class NetconfDeviceCommunicator * * @param dispatcher {@code NetconfCLientDispatcher} * @param config {@code NetconfClientConfiguration} - * @return future that returns succes on first succesfull connection and failure when the underlying - * reconnecting strategy runs out of reconnection attempts + * @return a ListenableFuture that returns success on first successful connection and failure when the underlying + * reconnecting strategy runs out of reconnection attempts */ - public ListenableFuture initializeRemoteConnection( - final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) { + public ListenableFuture initializeRemoteConnection(final NetconfClientDispatcher dispatcher, + final NetconfClientConfiguration config) { + + final Future connectFuture; if (config instanceof NetconfReconnectingClientConfiguration) { - initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config); + // FIXME: This is weird. If I understand it correctly we want to know about the first connection so as to + // forward error state. Analyze the call graph to understand what is going on here. We really want + // to move reconnection away from the socket layer, so that it can properly interface with sessions + // and generally has some event-driven state (as all good network glue does). There is a second story + // which is we want to avoid duplicate code, so it depends on other users as well. + final var future = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config); + taskFuture = future; + connectFuture = future.firstSessionFuture(); } else { - initFuture = dispatcher.createClient(config); + taskFuture = connectFuture = dispatcher.createClient(config); } - - initFuture.addListener(future -> { + connectFuture.addListener(future -> { if (!future.isSuccess() && !future.isCancelled()) { LOG.debug("{}: Connection failed", id, future.cause()); - NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause()); - if (firstConnectionFuture.isDone()) { + remoteDevice.onRemoteSessionFailed(future.cause()); + if (!firstConnectionFuture.isDone()) { firstConnectionFuture.setException(future.cause()); } } @@ -167,8 +167,8 @@ public class NetconfDeviceCommunicator public void disconnect() { // If session is already in closing, no need to close it again - if (session != null && isSessionClosing.compareAndSet(false, true)) { - session.close(); + if (currentSession != null && startClosing() && currentSession.isUp()) { + currentSession.close(); } } @@ -177,11 +177,11 @@ public class NetconfDeviceCommunicator LOG.warn("It's curious that no one to close the session but tearDown is called!"); } LOG.debug("Tearing down {}", reason); - final List>> futuresToCancel = Lists.newArrayList(); + final List>> futuresToCancel = new ArrayList<>(); sessionLock.lock(); try { - if (session != null) { - session = null; + if (currentSession != null) { + currentSession = null; /* * Walk all requests, check if they have been executing * or cancelled and remove them from the queue. @@ -210,29 +210,32 @@ public class NetconfDeviceCommunicator if (Strings.isNullOrEmpty(reason)) { future.set(createSessionDownRpcResult()); } else { - future.set(createErrorRpcResult(RpcError.ErrorType.TRANSPORT, reason)); + future.set(createErrorRpcResult(ErrorType.TRANSPORT, reason)); } } - isSessionClosing.set(false); + closing = 0; } private RpcResult createSessionDownRpcResult() { - return createErrorRpcResult(RpcError.ErrorType.TRANSPORT, - String.format("The netconf session to %1$s is disconnected", id.getName())); + return createErrorRpcResult(ErrorType.TRANSPORT, + String.format("The netconf session to %1$s is disconnected", id.name())); } - private static RpcResult createErrorRpcResult(final RpcError.ErrorType errorType, - final String message) { + private static RpcResult createErrorRpcResult(final ErrorType errorType, final String message) { return RpcResultBuilder.failed() - .withError(errorType, NetconfDocumentedException.ErrorTag.OPERATION_FAILED.getTagValue(), message).build(); + .withError(errorType, ErrorTag.OPERATION_FAILED, message).build(); } @Override public void onSessionDown(final NetconfClientSession session, final Exception exception) { // If session is already in closing, no need to call tearDown again. - if (isSessionClosing.compareAndSet(false, true)) { - LOG.warn("{}: Session went down", id, exception); + if (startClosing()) { + if (exception instanceof EOFException) { + LOG.info("{}: Session went down: {}", id, exception.getMessage()); + } else { + LOG.warn("{}: Session went down", id, exception); + } tearDown(null); } } @@ -247,8 +250,8 @@ public class NetconfDeviceCommunicator @Override public void close() { // Cancel reconnect if in progress - if (initFuture != null) { - initFuture.cancel(false); + if (taskFuture != null) { + taskFuture.cancel(false); } // Disconnect from device // tear down not necessary, called indirectly by the close in disconnect() @@ -275,7 +278,7 @@ public class NetconfDeviceCommunicator try { request = requests.peek(); if (request != null && request.future.isUncancellable()) { - requests.poll(); + request = requests.poll(); // we have just removed one request from the queue // we can also release one permit if (semaphore != null) { @@ -290,50 +293,51 @@ public class NetconfDeviceCommunicator sessionLock.unlock(); } - if (request != null) { - - if (FailedNetconfMessage.class.isInstance(message)) { - request.future.set(NetconfMessageTransformUtil.toRpcResult((FailedNetconfMessage) message)); - return; - } - - LOG.debug("{}: Message received {}", id, message); + if (request == null) { + // No matching request, bail out + return; + } - if (LOG.isTraceEnabled()) { - LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message)); - } - try { - NetconfMessageTransformUtil.checkValidReply(request.request, message); - } catch (final NetconfDocumentedException e) { - LOG.warn( - "{}: Invalid request-reply match," - + "reply message contains different message-id, request: {}, response: {}", - id, msgToS(request.request), msgToS(message), e); + if (message instanceof FailedNetconfMessage) { + request.future.set(NetconfMessageTransformUtil.toRpcResult((FailedNetconfMessage) message)); + return; + } - request.future.set(RpcResultBuilder.failed() - .withRpcError(NetconfMessageTransformUtil.toRpcError(e)).build()); + LOG.debug("{}: Message received {}", id, message); - //recursively processing message to eventually find matching request - processMessage(message); + if (LOG.isTraceEnabled()) { + LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message)); + } - return; - } + try { + NetconfMessageTransformUtil.checkValidReply(request.request, message); + } catch (final NetconfDocumentedException e) { + LOG.warn("{}: Invalid request-reply match, reply message contains different message-id, " + + "request: {}, response: {}", id, msgToS(request.request), msgToS(message), e); - try { - NetconfMessageTransformUtil.checkSuccessReply(message); - } catch (final NetconfDocumentedException e) { - LOG.warn( - "{}: Error reply from remote device, request: {}, response: {}", - id, msgToS(request.request), msgToS(message), e); + request.future.set(RpcResultBuilder.failed() + .withRpcError(NetconfMessageTransformUtil.toRpcError(e)) + .build()); - request.future.set(RpcResultBuilder.failed() - .withRpcError(NetconfMessageTransformUtil.toRpcError(e)).build()); - return; - } + //recursively processing message to eventually find matching request + processMessage(message); + return; + } - request.future.set(RpcResultBuilder.success(message).build()); + try { + NetconfMessageTransformUtil.checkSuccessReply(message); + } catch (final NetconfDocumentedException e) { + LOG.warn("{}: Error reply from remote device, request: {}, response: {}", + id, msgToS(request.request), msgToS(message), e); + + request.future.set(RpcResultBuilder.failed() + .withRpcError(NetconfMessageTransformUtil.toRpcError(e)) + .build()); + return; } + + request.future.set(RpcResultBuilder.success(message).build()); } private static String msgToS(final NetconfMessage msg) { @@ -343,18 +347,15 @@ public class NetconfDeviceCommunicator @Override public ListenableFuture> sendRequest(final NetconfMessage message, final QName rpc) { sessionLock.lock(); - - if (semaphore != null && !semaphore.tryAcquire()) { - LOG.warn("Limit of concurrent rpc messages was reached (limit :" - + concurentRpcMsgs + "). Rpc reply message is needed. Discarding request of Netconf device with id" - + id.getName()); - sessionLock.unlock(); - return Futures.immediateFailedFuture(new NetconfDocumentedException( - "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs - + ") waiting for emptying the queue of Netconf device with id" + id.getName())); - } - try { + if (semaphore != null && !semaphore.tryAcquire()) { + LOG.warn("Limit of concurrent rpc messages was reached (limit: {}). Rpc reply message is needed. " + + "Discarding request of Netconf device with id: {}", concurentRpcMsgs, id.name()); + return FluentFutures.immediateFailedFluentFuture(new NetconfDocumentedException( + "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs + + ") waiting for emptying the queue of Netconf device with id: " + id.name())); + } + return sendRequestWithLock(message, rpc); } finally { sessionLock.unlock(); @@ -367,16 +368,16 @@ public class NetconfDeviceCommunicator LOG.trace("{}: Sending message {}", id, msgToS(message)); } - if (session == null) { + if (currentSession == null) { LOG.warn("{}: Session is disconnected, failing RPC request {}", id, message); - return Futures.immediateFuture(createSessionDownRpcResult()); + return FluentFutures.immediateFluentFuture(createSessionDownRpcResult()); } final Request req = new Request(new UncancellableFuture<>(true), message); requests.add(req); - session.sendMessage(req.request).addListener(future -> { + currentSession.sendMessage(req.request).addListener(future -> { if (!future.isSuccess()) { // We expect that a session down will occur at this point LOG.debug("{}: Failed to send request {}", id, @@ -384,8 +385,7 @@ public class NetconfDeviceCommunicator future.cause()); if (future.cause() != null) { - req.future.set(createErrorRpcResult(RpcError.ErrorType.TRANSPORT, - future.cause().getLocalizedMessage())); + req.future.set(createErrorRpcResult(ErrorType.TRANSPORT, future.cause().getLocalizedMessage())); } else { req.future.set(createSessionDownRpcResult()); // assume session is down } @@ -425,4 +425,8 @@ public class NetconfDeviceCommunicator this.request = request; } } + + private boolean startClosing() { + return CLOSING_UPDATER.compareAndSet(this, 0, 1); + } }