From 3c8f8344ef76007937f3738848da4146dc70e455 Mon Sep 17 00:00:00 2001 From: Peter Suna Date: Tue, 18 Jun 2024 18:13:36 +0200 Subject: [PATCH] Release semaphore after disconnection Ensure semaphore is released when requests are removed in the tearDown method. This ensures that the concurrent RPC limit corresponds accurately to the actual requests. JIRA: NETCONF-1332 Change-Id: Id677b5d05805d50c193235f6b2a594bb76f0ef59 Signed-off-by: Peter Suna --- .../mdsal/NetconfDeviceCommunicator.java | 5 +++ .../mdsal/NetconfDeviceCommunicatorTest.java | 41 +++++++++++++++++-- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/NetconfDeviceCommunicator.java b/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/NetconfDeviceCommunicator.java index 5c16c12e88..5fa15b39b5 100644 --- a/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/NetconfDeviceCommunicator.java +++ b/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/NetconfDeviceCommunicator.java @@ -166,6 +166,11 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, if (r.future.isUncancellable()) { futuresToCancel.add(r.future); it.remove(); + // we have just removed one request from the queue + // we can also release one permit + if (semaphore != null) { + semaphore.release(); + } } else if (r.future.isCancelled()) { // This just does some house-cleaning it.remove(); diff --git a/plugins/netconf-client-mdsal/src/test/java/org/opendaylight/netconf/client/mdsal/NetconfDeviceCommunicatorTest.java b/plugins/netconf-client-mdsal/src/test/java/org/opendaylight/netconf/client/mdsal/NetconfDeviceCommunicatorTest.java index 3af378a76d..3f4dffdf89 100644 --- a/plugins/netconf-client-mdsal/src/test/java/org/opendaylight/netconf/client/mdsal/NetconfDeviceCommunicatorTest.java +++ b/plugins/netconf-client-mdsal/src/test/java/org/opendaylight/netconf/client/mdsal/NetconfDeviceCommunicatorTest.java @@ -16,12 +16,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.common.base.CharMatcher; @@ -68,6 +70,7 @@ import org.opendaylight.yangtools.yang.common.Uint32; @ExtendWith(MockitoExtension.class) class NetconfDeviceCommunicatorTest { private static final SessionIdType SESSION_ID = new SessionIdType(Uint32.ONE); + private static final int RPC_MESSAGE_LIMIT = 10; @Mock private RemoteDevice mockDevice; @@ -81,8 +84,8 @@ class NetconfDeviceCommunicatorTest { @BeforeEach void setUp() { - communicator = new NetconfDeviceCommunicator( - new RemoteDeviceId("test", InetSocketAddress.createUnresolved("localhost", 22)), mockDevice, 10); + communicator = new NetconfDeviceCommunicator(new RemoteDeviceId("test", + InetSocketAddress.createUnresolved("localhost", 22)), mockDevice, RPC_MESSAGE_LIMIT); spySession = spy(new NetconfClientSession(mock(NetconfClientSessionListener.class), mock(Channel.class), SESSION_ID, Set.of())); } @@ -212,6 +215,38 @@ class NetconfDeviceCommunicatorTest { verify(mockDevice, never()).onRemoteSessionDown(); } + @Test + void testMessageLimitAfterDisconnect() throws Exception { + // Prepare environment. + setupSession(); + final var futureListener = ArgumentCaptor.forClass(GenericFutureListener.class); + doReturn(mockChannelFuture).when(mockChannelFuture).addListener(futureListener.capture()); + final var message = new NetconfMessage(UntrustedXML.newDocumentBuilder().newDocument()); + doReturn(mockChannelFuture).when(spySession).sendMessage(same(message)); + doReturn(true).when(spySession).isUp(); + doAnswer(invocationOnMock -> { + communicator.onSessionTerminated(spySession, new NetconfTerminationReason("Session closed")); + return null; + }).when(spySession).close(); + + // Reach max-connection-attempts. + for (int i = 1; i <= RPC_MESSAGE_LIMIT; i++) { + final var resultFuture = communicator.sendRequest(message); + assertInstanceOf(UncancellableFuture.class, resultFuture, + String.format("The resultFuture has an incorrect type: %s", resultFuture)); + verify(spySession, times(i)).sendMessage(same(message)); + communicator.disconnect(); + communicator.onSessionUp(spySession); + } + + // Verify that more requests can be sent because the semaphore counter is not 0. + final var resultFuture = communicator.sendRequest(message); + assertInstanceOf(UncancellableFuture.class, resultFuture, + String.format("The resultFuture has an incorrect type: %s", resultFuture)); + verify(spySession, times(RPC_MESSAGE_LIMIT + 1)).sendMessage(same(message)); + verify(mockChannelFuture, times(RPC_MESSAGE_LIMIT + 1)).addListener(futureListener.capture()); + } + @SuppressWarnings("unchecked") @Test void testSendRequest() throws Exception { @@ -388,7 +423,7 @@ class NetconfDeviceCommunicatorTest { setupSession(); final var messageID = new ArrayList(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < RPC_MESSAGE_LIMIT; i++) { messageID.add(UUID.randomUUID().toString()); final var resultFuture = sendRequest(messageID.get(i), false); assertInstanceOf(UncancellableFuture.class, resultFuture, "ListenableFuture is null"); -- 2.36.6