X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Fconnect%2Fnetconf%2Flistener%2FNetconfDeviceCommunicator.java;h=260beaf59a3b4cc50ffbc05455cd276d38c498ca;hb=ab97282bcb83ee4510d18b33149bdd90c0863af3;hp=e979071d873f3c7c2770c545cfa9cbfc65480cc9;hpb=a1be400609994146d8a2b23ec3b3b89e59619ea3;p=netconf.git diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java index e979071d87..260beaf59a 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java @@ -20,6 +20,8 @@ import java.util.ArrayDeque; import java.util.Iterator; import java.util.List; import java.util.Queue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.opendaylight.controller.config.util.xml.XmlElement; @@ -53,29 +55,45 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, protected final RemoteDeviceId id; private final Lock sessionLock = new ReentrantLock(); - // TODO implement concurrent message limit + private final Semaphore semaphore; + private final int concurentRpcMsgs; + private final Queue requests = new ArrayDeque<>(); private NetconfClientSession session; private Future initFuture; - private SettableFuture firstConnectionFuture; + private final SettableFuture firstConnectionFuture; + + // isSessionClosing indicates a close operation on the session is issued and + // tearDown will surely be called later to finish the close. + // Used to allow only one thread to enter tearDown and other threads should + // NOT enter it simultaneously and should end its close operation without + // calling tearDown to release the locks they hold to avoid deadlock. + private final AtomicBoolean isSessionClosing = new AtomicBoolean(false); + + public Boolean isSessionClosing() { + return isSessionClosing.get(); + } public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, - final UserPreferences NetconfSessionPreferences) { - this(id, remoteDevice, Optional.of(NetconfSessionPreferences)); + final UserPreferences NetconfSessionPreferences, final int rpcMessageLimit) { + this(id, remoteDevice, Optional.of(NetconfSessionPreferences), rpcMessageLimit); } public NetconfDeviceCommunicator(final RemoteDeviceId id, - final RemoteDevice remoteDevice) { - this(id, remoteDevice, Optional.absent()); + final RemoteDevice remoteDevice, + final int rpcMessageLimit) { + this(id, remoteDevice, Optional.absent(), rpcMessageLimit); } private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, - final Optional overrideNetconfCapabilities) { + final Optional overrideNetconfCapabilities, final int rpcMessageLimit) { + this.concurentRpcMsgs = rpcMessageLimit; this.id = id; this.remoteDevice = remoteDevice; this.overrideNetconfCapabilities = overrideNetconfCapabilities; this.firstConnectionFuture = SettableFuture.create(); + this.semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null; } @Override @@ -90,16 +108,20 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, LOG.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences); - if(overrideNetconfCapabilities.isPresent()) { - netconfSessionPreferences = overrideNetconfCapabilities.get().isOverride() ? - netconfSessionPreferences.replaceModuleCaps(overrideNetconfCapabilities.get().getSessionPreferences()) : - netconfSessionPreferences.addModuleCaps(overrideNetconfCapabilities.get().getSessionPreferences()); - LOG.debug( - "{}: Session capabilities overridden, capabilities that will be used: {}", - id, netconfSessionPreferences); + if (overrideNetconfCapabilities.isPresent()) { + final NetconfSessionPreferences sessionPreferences = overrideNetconfCapabilities + .get().getSessionPreferences(); + netconfSessionPreferences = overrideNetconfCapabilities.get().moduleBasedCapsOverrided() + ? netconfSessionPreferences.replaceModuleCaps(sessionPreferences) + : netconfSessionPreferences.addModuleCaps(sessionPreferences); + + netconfSessionPreferences = overrideNetconfCapabilities.get().nonModuleBasedCapsOverrided() + ? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences) + : netconfSessionPreferences.addNonModuleCaps(sessionPreferences); + LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, + netconfSessionPreferences); } - remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this); if (!firstConnectionFuture.isDone()) { firstConnectionFuture.set(netconfSessionPreferences.getNetconfDeviceCapabilities()); @@ -128,7 +150,7 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, initFuture.addListener(new GenericFutureListener>(){ @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(final Future future) throws Exception { if (!future.isSuccess() && !future.isCancelled()) { LOG.debug("{}: Connection failed", id, future.cause()); NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause()); @@ -142,14 +164,18 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, } public void disconnect() { - if(session != null) { + // If session is already in closing, no need to close it again + if(session != null && isSessionClosing.compareAndSet(false, true)) { session.close(); } } - private void tearDown( String reason ) { + private void tearDown(final String reason) { + if (!isSessionClosing()) { + LOG.warn("It's curious that no one to close the session but tearDown is called!"); + } LOG.debug("Tearing down {}", reason); - List>> futuresToCancel = Lists.newArrayList(); + final List>> futuresToCancel = Lists.newArrayList(); sessionLock.lock(); try { if( session != null ) { @@ -180,13 +206,15 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, // Notify pending request futures outside of the sessionLock to avoid unnecessarily // blocking the caller. - for( UncancellableFuture> future: futuresToCancel ) { + for (final UncancellableFuture> future : futuresToCancel) { if( Strings.isNullOrEmpty( reason ) ) { future.set( createSessionDownRpcResult() ); } else { future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, reason ) ); } } + + isSessionClosing.set(false); } private RpcResult createSessionDownRpcResult() { @@ -194,19 +222,23 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, String.format( "The netconf session to %1$s is disconnected", id.getName() ) ); } - private RpcResult createErrorRpcResult( RpcError.ErrorType errorType, String message ) { + private RpcResult createErrorRpcResult(final RpcError.ErrorType errorType, final String message) { return RpcResultBuilder.failed() - .withError(errorType, NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(), message).build(); + .withError(errorType, NetconfDocumentedException.ErrorTag.OPERATION_FAILED.getTagValue(), message).build(); } @Override public void onSessionDown(final NetconfClientSession session, final Exception e) { - LOG.warn("{}: Session went down", id, e); - tearDown( null ); + // If session is already in closing, no need to call tearDown again. + if (isSessionClosing.compareAndSet(false, true)) { + LOG.warn("{}: Session went down", id, e); + tearDown( null ); + } } @Override public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) { + // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing. LOG.warn("{}: Session terminated {}", id, reason); tearDown( reason.getErrorMessage() ); } @@ -218,10 +250,8 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, initFuture.cancel(false); } // Disconnect from device - if(session != null) { - session.close(); - // tear down not necessary, called indirectly by above close - } + // tear down not necessary, called indirectly by the close in disconnect() + disconnect(); } @Override @@ -245,6 +275,11 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, request = requests.peek(); if (request != null && request.future.isUncancellable()) { requests.poll(); + // we have just removed one request from the queue + // we can also release one permit + if(semaphore != null) { + semaphore.release(); + } } else { request = null; LOG.warn("{}: Ignoring unsolicited message {}", id, @@ -302,8 +337,17 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, @Override public ListenableFuture> sendRequest(final NetconfMessage message, final QName rpc) { sessionLock.lock(); + + if (semaphore != null && !semaphore.tryAcquire()) { + LOG.warn("Limit of concurrent rpc messages was reached (limit :" + + concurentRpcMsgs + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName()); + sessionLock.unlock(); + return Futures.immediateFailedFuture(new NetconfDocumentedException("Limit of rpc messages was reached (Limit :" + + concurentRpcMsgs + ") waiting for emptying the queue of Netconf device with id" + id.getName())); + } + try { - return sendRequestWithLock( message, rpc ); + return sendRequestWithLock(message, rpc); } finally { sessionLock.unlock(); } @@ -321,8 +365,7 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, return Futures.immediateFuture( createSessionDownRpcResult() ); } - final Request req = new Request( new UncancellableFuture>(true), - message ); + final Request req = new Request(new UncancellableFuture<>(true), message); requests.add(req); session.sendMessage(req.request).addListener(new FutureListener() {