X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fsal%2Fconnect%2Fnetconf%2Flistener%2FNetconfDeviceCommunicator.java;h=93ac945f2d2c9c9cc8479796ada6a27906bab102;hb=ad5f02d0c4ddcc415ae5785c7c74080809a7eb6c;hp=e979071d873f3c7c2770c545cfa9cbfc65480cc9;hpb=a9b8c477d5b128ae42f6cb5dae8b083ddf2c4022;p=netconf.git diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java index e979071d87..93ac945f2d 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java @@ -20,6 +20,8 @@ import java.util.ArrayDeque; import java.util.Iterator; import java.util.List; import java.util.Queue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.opendaylight.controller.config.util.xml.XmlElement; @@ -53,29 +55,45 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, protected final RemoteDeviceId id; private final Lock sessionLock = new ReentrantLock(); - // TODO implement concurrent message limit + private final Semaphore semaphore; + private final int concurentRpcMsgs; + private final Queue requests = new ArrayDeque<>(); private NetconfClientSession session; private Future initFuture; private SettableFuture firstConnectionFuture; + // isSessionClosing indicates a close operation on the session is issued and + // tearDown will surely be called later to finish the close. + // Used to allow only one thread to enter tearDown and other threads should + // NOT enter it simultaneously and should end its close operation without + // calling tearDown to release the locks they hold to avoid deadlock. + private volatile AtomicBoolean isSessionClosing = new AtomicBoolean(false); + + public Boolean isSessionClosing() { + return isSessionClosing.get(); + } + public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, - final UserPreferences NetconfSessionPreferences) { - this(id, remoteDevice, Optional.of(NetconfSessionPreferences)); + final UserPreferences NetconfSessionPreferences, final int rpcMessageLimit) { + this(id, remoteDevice, Optional.of(NetconfSessionPreferences), rpcMessageLimit); } public NetconfDeviceCommunicator(final RemoteDeviceId id, - final RemoteDevice remoteDevice) { - this(id, remoteDevice, Optional.absent()); + final RemoteDevice remoteDevice, + final int rpcMessageLimit) { + this(id, remoteDevice, Optional.absent(), rpcMessageLimit); } private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, - final Optional overrideNetconfCapabilities) { + final Optional overrideNetconfCapabilities, final int rpcMessageLimit) { + this.concurentRpcMsgs = rpcMessageLimit; this.id = id; this.remoteDevice = remoteDevice; this.overrideNetconfCapabilities = overrideNetconfCapabilities; this.firstConnectionFuture = SettableFuture.create(); + this.semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null; } @Override @@ -142,12 +160,16 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, } public void disconnect() { - if(session != null) { + // If session is already in closing, no need to close it again + if(session != null && isSessionClosing.compareAndSet(false, true)) { session.close(); } } private void tearDown( String reason ) { + if (!isSessionClosing()) { + LOG.warn("It's curious that no one to close the session but tearDown is called!"); + } LOG.debug("Tearing down {}", reason); List>> futuresToCancel = Lists.newArrayList(); sessionLock.lock(); @@ -187,6 +209,8 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, reason ) ); } } + + isSessionClosing.set(false); } private RpcResult createSessionDownRpcResult() { @@ -201,12 +225,16 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, @Override public void onSessionDown(final NetconfClientSession session, final Exception e) { - LOG.warn("{}: Session went down", id, e); - tearDown( null ); + // If session is already in closing, no need to call tearDown again. + if (isSessionClosing.compareAndSet(false, true)) { + LOG.warn("{}: Session went down", id, e); + tearDown( null ); + } } @Override public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) { + // onSessionTerminated is called directly by disconnect, no need to compare and set isSessionClosing. LOG.warn("{}: Session terminated {}", id, reason); tearDown( reason.getErrorMessage() ); } @@ -245,6 +273,11 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, request = requests.peek(); if (request != null && request.future.isUncancellable()) { requests.poll(); + // we have just removed one request from the queue + // we can also release one permit + if(semaphore != null) { + semaphore.release(); + } } else { request = null; LOG.warn("{}: Ignoring unsolicited message {}", id, @@ -302,8 +335,17 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, @Override public ListenableFuture> sendRequest(final NetconfMessage message, final QName rpc) { sessionLock.lock(); + + if (semaphore != null && !semaphore.tryAcquire()) { + LOG.warn("Limit of concurrent rpc messages was reached (limit :" + + concurentRpcMsgs + "). Rpc reply message is needed. Discarding request of Netconf device with id" + id.getName()); + sessionLock.unlock(); + return Futures.immediateFailedFuture(new NetconfDocumentedException("Limit of rpc messages was reached (Limit :" + + concurentRpcMsgs + ") waiting for emptying the queue of Netconf device with id" + id.getName())); + } + try { - return sendRequestWithLock( message, rpc ); + return sendRequestWithLock(message, rpc); } finally { sessionLock.unlock(); }