X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Fconnect%2Fnetconf%2Flistener%2FNetconfDeviceCommunicator.java;h=0e240dde64d74b728da856540d8f57c0f71a96bb;hb=16d0b397db62207e42648e6298b7c826a4858b77;hp=4fbd6f624145eac163c8bdb2cc61b3384782907d;hpb=baef34a135c0ee49c7be9cfe7bf806905c96d661;p=netconf.git diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java index 4fbd6f6241..0e240dde64 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java @@ -14,21 +14,21 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.GenericFutureListener; import java.util.ArrayDeque; import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.opendaylight.controller.config.util.xml.XmlElement; -import org.opendaylight.controller.config.util.xml.XmlUtil; +import org.opendaylight.netconf.api.FailedNetconfMessage; import org.opendaylight.netconf.api.NetconfDocumentedException; import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfTerminationReason; +import org.opendaylight.netconf.api.xml.XmlElement; import org.opendaylight.netconf.api.xml.XmlNetconfConstants; +import org.opendaylight.netconf.api.xml.XmlUtil; import org.opendaylight.netconf.client.NetconfClientDispatcher; import org.opendaylight.netconf.client.NetconfClientSession; import org.opendaylight.netconf.client.NetconfClientSessionListener; @@ -45,7 +45,8 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator { +public class NetconfDeviceCommunicator + implements NetconfClientSessionListener, RemoteDeviceCommunicator { private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class); @@ -58,24 +59,40 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, private final int concurentRpcMsgs; private final Queue requests = new ArrayDeque<>(); - private NetconfClientSession session; + private NetconfClientSession currentSession; private Future initFuture; - private SettableFuture firstConnectionFuture; + private final SettableFuture firstConnectionFuture; - public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, - final UserPreferences NetconfSessionPreferences, final int rpcMessageLimit) { - this(id, remoteDevice, Optional.of(NetconfSessionPreferences), rpcMessageLimit); + // isSessionClosing indicates a close operation on the session is issued and + // tearDown will surely be called later to finish the close. + // Used to allow only one thread to enter tearDown and other threads should + // NOT enter it simultaneously and should end its close operation without + // calling tearDown to release the locks they hold to avoid deadlock. + private final AtomicBoolean isSessionClosing = new AtomicBoolean(false); + + public Boolean isSessionClosing() { + return isSessionClosing.get(); + } + + public NetconfDeviceCommunicator( + final RemoteDeviceId id, + final RemoteDevice remoteDevice, + final UserPreferences netconfSessionPreferences, final int rpcMessageLimit) { + this(id, remoteDevice, Optional.of(netconfSessionPreferences), rpcMessageLimit); } - public NetconfDeviceCommunicator(final RemoteDeviceId id, - final RemoteDevice remoteDevice, - final int rpcMessageLimit) { + public NetconfDeviceCommunicator( + final RemoteDeviceId id, + final RemoteDevice remoteDevice, + final int rpcMessageLimit) { this(id, remoteDevice, Optional.absent(), rpcMessageLimit); } - private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, - final Optional overrideNetconfCapabilities, final int rpcMessageLimit) { + private NetconfDeviceCommunicator( + final RemoteDeviceId id, + final RemoteDevice remoteDevice, + final Optional overrideNetconfCapabilities, final int rpcMessageLimit) { this.concurentRpcMsgs = rpcMessageLimit; this.id = id; this.remoteDevice = remoteDevice; @@ -89,58 +106,59 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, sessionLock.lock(); try { LOG.debug("{}: Session established", id); - this.session = session; + currentSession = session; NetconfSessionPreferences netconfSessionPreferences = NetconfSessionPreferences.fromNetconfSession(session); LOG.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences); - if(overrideNetconfCapabilities.isPresent()) { - netconfSessionPreferences = overrideNetconfCapabilities.get().isOverride() ? - netconfSessionPreferences.replaceModuleCaps(overrideNetconfCapabilities.get().getSessionPreferences()) : - netconfSessionPreferences.addModuleCaps(overrideNetconfCapabilities.get().getSessionPreferences()); - LOG.debug( - "{}: Session capabilities overridden, capabilities that will be used: {}", - id, netconfSessionPreferences); + if (overrideNetconfCapabilities.isPresent()) { + final NetconfSessionPreferences sessionPreferences = overrideNetconfCapabilities + .get().getSessionPreferences(); + netconfSessionPreferences = overrideNetconfCapabilities.get().moduleBasedCapsOverrided() + ? netconfSessionPreferences.replaceModuleCaps(sessionPreferences) + : netconfSessionPreferences.addModuleCaps(sessionPreferences); + + netconfSessionPreferences = overrideNetconfCapabilities.get().nonModuleBasedCapsOverrided() + ? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences) + : netconfSessionPreferences.addNonModuleCaps(sessionPreferences); + LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, + netconfSessionPreferences); } - remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this); if (!firstConnectionFuture.isDone()) { firstConnectionFuture.set(netconfSessionPreferences.getNetconfDeviceCapabilities()); } - } - finally { + } finally { sessionLock.unlock(); } } /** + * Initialize remote connection. * - * @param dispatcher - * @param config + * @param dispatcher {@code NetconfCLientDispatcher} + * @param config {@code NetconfClientConfiguration} * @return future that returns succes on first succesfull connection and failure when the underlying - * reconnecting strategy runs out of reconnection attempts + * reconnecting strategy runs out of reconnection attempts */ - public ListenableFuture initializeRemoteConnection(final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) { - if(config instanceof NetconfReconnectingClientConfiguration) { + public ListenableFuture initializeRemoteConnection( + final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) { + if (config instanceof NetconfReconnectingClientConfiguration) { initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config); } else { initFuture = dispatcher.createClient(config); } - initFuture.addListener(new GenericFutureListener>(){ - - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess() && !future.isCancelled()) { - LOG.debug("{}: Connection failed", id, future.cause()); - NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause()); - if (firstConnectionFuture.isDone()) { - firstConnectionFuture.setException(future.cause()); - } + initFuture.addListener(future -> { + if (!future.isSuccess() && !future.isCancelled()) { + LOG.debug("{}: Connection failed", id, future.cause()); + NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause()); + if (firstConnectionFuture.isDone()) { + firstConnectionFuture.setException(future.cause()); } } }); @@ -148,19 +166,22 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, } public void disconnect() { - if(session != null) { - session.close(); + // If session is already in closing, no need to close it again + if (currentSession != null && isSessionClosing.compareAndSet(false, true)) { + currentSession.close(); } } - private void tearDown( String reason ) { + private void tearDown(final String reason) { + if (!isSessionClosing()) { + LOG.warn("It's curious that no one to close the session but tearDown is called!"); + } LOG.debug("Tearing down {}", reason); - List>> futuresToCancel = Lists.newArrayList(); + final List>> futuresToCancel = Lists.newArrayList(); sessionLock.lock(); try { - if( session != null ) { - session = null; - + if (currentSession != null) { + currentSession = null; /* * Walk all requests, check if they have been executing * or cancelled and remove them from the queue. @@ -169,7 +190,7 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, while (it.hasNext()) { final Request r = it.next(); if (r.future.isUncancellable()) { - futuresToCancel.add( r.future ); + futuresToCancel.add(r.future); it.remove(); } else if (r.future.isCancelled()) { // This just does some house-cleaning @@ -179,55 +200,59 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, remoteDevice.onRemoteSessionDown(); } - } - finally { + } finally { sessionLock.unlock(); } // Notify pending request futures outside of the sessionLock to avoid unnecessarily // blocking the caller. - for( UncancellableFuture> future: futuresToCancel ) { - if( Strings.isNullOrEmpty( reason ) ) { - future.set( createSessionDownRpcResult() ); + for (final UncancellableFuture> future : futuresToCancel) { + if (Strings.isNullOrEmpty(reason)) { + future.set(createSessionDownRpcResult()); } else { - future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, reason ) ); + future.set(createErrorRpcResult(RpcError.ErrorType.TRANSPORT, reason)); } } + + isSessionClosing.set(false); } private RpcResult createSessionDownRpcResult() { - return createErrorRpcResult( RpcError.ErrorType.TRANSPORT, - String.format( "The netconf session to %1$s is disconnected", id.getName() ) ); + return createErrorRpcResult(RpcError.ErrorType.TRANSPORT, + String.format("The netconf session to %1$s is disconnected", id.getName())); } - private RpcResult createErrorRpcResult( RpcError.ErrorType errorType, String message ) { + private static RpcResult createErrorRpcResult(final RpcError.ErrorType errorType, + final String message) { return RpcResultBuilder.failed() - .withError(errorType, NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(), message).build(); + .withError(errorType, NetconfDocumentedException.ErrorTag.OPERATION_FAILED.getTagValue(), message).build(); } @Override - public void onSessionDown(final NetconfClientSession session, final Exception e) { - LOG.warn("{}: Session went down", id, e); - tearDown( null ); + public void onSessionDown(final NetconfClientSession session, final Exception exception) { + // If session is already in closing, no need to call tearDown again. + if (isSessionClosing.compareAndSet(false, true)) { + LOG.warn("{}: Session went down", id, exception); + tearDown(null); + } } @Override public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) { + // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing. LOG.warn("{}: Session terminated {}", id, reason); - tearDown( reason.getErrorMessage() ); + tearDown(reason.getErrorMessage()); } @Override public void close() { // Cancel reconnect if in progress - if(initFuture != null) { + if (initFuture != null) { initFuture.cancel(false); } // Disconnect from device - if(session != null) { - session.close(); - // tear down not necessary, called indirectly by above close - } + // tear down not necessary, called indirectly by the close in disconnect() + disconnect(); } @Override @@ -250,10 +275,10 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, try { request = requests.peek(); if (request != null && request.future.isUncancellable()) { - requests.poll(); + request = requests.poll(); // we have just removed one request from the queue // we can also release one permit - if(semaphore != null) { + if (semaphore != null) { semaphore.release(); } } else { @@ -261,28 +286,33 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, LOG.warn("{}: Ignoring unsolicited message {}", id, msgToS(message)); } - } - finally { + } finally { sessionLock.unlock(); } - if( request != null ) { + if (request != null) { + + if (FailedNetconfMessage.class.isInstance(message)) { + request.future.set(NetconfMessageTransformUtil.toRpcResult((FailedNetconfMessage) message)); + return; + } LOG.debug("{}: Message received {}", id, message); - if(LOG.isTraceEnabled()) { - LOG.trace( "{}: Matched request: {} to response: {}", id, msgToS( request.request ), msgToS( message ) ); + if (LOG.isTraceEnabled()) { + LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message)); } try { - NetconfMessageTransformUtil.checkValidReply( request.request, message ); + NetconfMessageTransformUtil.checkValidReply(request.request, message); } catch (final NetconfDocumentedException e) { LOG.warn( - "{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}", + "{}: Invalid request-reply match," + + "reply message contains different message-id, request: {}, response: {}", id, msgToS(request.request), msgToS(message), e); - request.future.set( RpcResultBuilder.failed() - .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() ); + request.future.set(RpcResultBuilder.failed() + .withRpcError(NetconfMessageTransformUtil.toRpcError(e)).build()); //recursively processing message to eventually find matching request processMessage(message); @@ -292,17 +322,17 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, try { NetconfMessageTransformUtil.checkSuccessReply(message); - } catch(final NetconfDocumentedException e) { + } catch (final NetconfDocumentedException e) { LOG.warn( "{}: Error reply from remote device, request: {}, response: {}", id, msgToS(request.request), msgToS(message), e); - request.future.set( RpcResultBuilder.failed() - .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() ); + request.future.set(RpcResultBuilder.failed() + .withRpcError(NetconfMessageTransformUtil.toRpcError(e)).build()); return; } - request.future.set( RpcResultBuilder.success( message ).build() ); + request.future.set(RpcResultBuilder.success(message).build()); } } @@ -313,58 +343,52 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, @Override public ListenableFuture> sendRequest(final NetconfMessage message, final QName rpc) { sessionLock.lock(); - - if (semaphore != null && !semaphore.tryAcquire()) { - LOG.warn("Limit of concurrent rpc messages was reached (limit :" + - concurentRpcMsgs + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName()); - sessionLock.unlock(); - return Futures.immediateFailedFuture(new NetconfDocumentedException("Limit of rpc messages was reached (Limit :" + - concurentRpcMsgs + ") waiting for emptying the queue of Netconf device with id" + id.getName())); - } - try { + if (semaphore != null && !semaphore.tryAcquire()) { + LOG.warn("Limit of concurrent rpc messages was reached (limit :" + concurentRpcMsgs + + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName()); + return Futures.immediateFailedFuture(new NetconfDocumentedException( + "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs + + ") waiting for emptying the queue of Netconf device with id" + id.getName())); + } + return sendRequestWithLock(message, rpc); } finally { sessionLock.unlock(); } } - private ListenableFuture> sendRequestWithLock( - final NetconfMessage message, final QName rpc) { - if(LOG.isTraceEnabled()) { + private ListenableFuture> sendRequestWithLock(final NetconfMessage message, + final QName rpc) { + if (LOG.isTraceEnabled()) { LOG.trace("{}: Sending message {}", id, msgToS(message)); } - if (session == null) { + if (currentSession == null) { LOG.warn("{}: Session is disconnected, failing RPC request {}", id, message); - return Futures.immediateFuture( createSessionDownRpcResult() ); + return Futures.immediateFuture(createSessionDownRpcResult()); } - final Request req = new Request( new UncancellableFuture>(true), - message ); + final Request req = new Request(new UncancellableFuture<>(true), message); requests.add(req); - session.sendMessage(req.request).addListener(new FutureListener() { - @Override - public void operationComplete(final Future future) throws Exception { - if( !future.isSuccess() ) { - // We expect that a session down will occur at this point - LOG.debug("{}: Failed to send request {}", id, - XmlUtil.toString(req.request.getDocument()), - future.cause()); - - if( future.cause() != null ) { - req.future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, - future.cause().getLocalizedMessage() ) ); - } else { - req.future.set( createSessionDownRpcResult() ); // assume session is down - } - req.future.setException( future.cause() ); - } - else { - LOG.trace("Finished sending request {}", req.request); + currentSession.sendMessage(req.request).addListener(future -> { + if (!future.isSuccess()) { + // We expect that a session down will occur at this point + LOG.debug("{}: Failed to send request {}", id, + XmlUtil.toString(req.request.getDocument()), + future.cause()); + + if (future.cause() != null) { + req.future.set(createErrorRpcResult(RpcError.ErrorType.TRANSPORT, + future.cause().getLocalizedMessage())); + } else { + req.future.set(createSessionDownRpcResult()); // assume session is down } + req.future.setException(future.cause()); + } else { + LOG.trace("Finished sending request {}", req.request); } }); @@ -372,7 +396,7 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, } private void processNotification(final NetconfMessage notification) { - if(LOG.isTraceEnabled()) { + if (LOG.isTraceEnabled()) { LOG.trace("{}: Notification received: {}", id, notification); } @@ -380,6 +404,10 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, } private static boolean isNotification(final NetconfMessage message) { + if (message.getDocument() == null) { + // We have no message, which mean we have a FailedNetconfMessage + return false; + } final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument()); return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ; }