From 2473df9920f0820fde7dcaf62eaf14166695a5f6 Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Tue, 16 Sep 2014 13:22:57 +0200 Subject: [PATCH 1/1] BUG-1842 Fix byte buffer handling for pending messages Change-Id: I54cd873e12d73779e20be280226480d090d9e8c5 Signed-off-by: Maros Marsalek --- opendaylight/netconf/netconf-it/pom.xml | 5 ++ .../netconf/it/NetconfITSecureTest.java | 78 +++++++++++++++---- .../src/test/resources/logback-test.xml | 1 + .../handler/ssh/client/AsyncSshHandler.java | 7 +- 4 files changed, 74 insertions(+), 17 deletions(-) diff --git a/opendaylight/netconf/netconf-it/pom.xml b/opendaylight/netconf/netconf-it/pom.xml index 272b686fc0..3a70a399bb 100644 --- a/opendaylight/netconf/netconf-it/pom.xml +++ b/opendaylight/netconf/netconf-it/pom.xml @@ -68,6 +68,11 @@ config-util test + + ${project.groupId} + sal-netconf-connector + test + ${project.groupId} netconf-api diff --git a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java index 4fe5f2a950..bc8efbe915 100644 --- a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java +++ b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java @@ -10,27 +10,35 @@ package org.opendaylight.controller.netconf.it; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import io.netty.channel.local.LocalAddress; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.auth.AuthProvider; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl; +import org.opendaylight.controller.netconf.client.NetconfClientSessionListener; import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener; import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder; @@ -42,7 +50,15 @@ import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator; import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil; import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil; import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.opendaylight.controller.sal.connect.api.RemoteDevice; +import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; import org.opendaylight.protocol.framework.NeverReconnectStrategy; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.xml.sax.SAXException; public class NetconfITSecureTest extends AbstractNetconfConfigTest { @@ -70,7 +86,7 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest { @Test public void testSecure() throws Exception { final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer()); - try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration())) { + try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration(new SimpleNetconfClientSessionListener()))) { NetconfMessage response = netconfClient.sendMessage(getGetConfig()); assertFalse("Unexpected error message " + XmlUtil.toString(response.getDocument()), NetconfMessageUtil.isErrorMessage(response)); @@ -91,29 +107,42 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest { /** * Test all requests are handled properly and no mismatch occurs in listener */ - @Test(timeout = 3*60*1000) + @Test(timeout = 5*60*1000) public void testSecureStress() throws Exception { + final int requests = 10000; + final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer()); - try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration())) { + final NetconfDeviceCommunicator sessionListener = getSessionListener(); + try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration(sessionListener))) { final AtomicInteger responseCounter = new AtomicInteger(0); - final List> futures = Lists.newArrayList(); + final List>> futures = Lists.newArrayList(); - final int requests = 1000; for (int i = 0; i < requests; i++) { - final Future netconfMessageFuture = netconfClient.sendRequest(getGetConfig()); + NetconfMessage getConfig = getGetConfig(); + getConfig = changeMessageId(getConfig, i); + final ListenableFuture> netconfMessageFuture = sessionListener.sendRequest(getConfig, QName.create("namespace", "2012-12-12", "get")); futures.add(netconfMessageFuture); - netconfMessageFuture.addListener(new GenericFutureListener>() { + Futures.addCallback(netconfMessageFuture, new FutureCallback>() { @Override - public void operationComplete(final Future future) throws Exception { - assertTrue("Request unsuccessful " + future.cause(), future.isSuccess()); + public void onSuccess(final RpcResult result) { responseCounter.incrementAndGet(); } + + @Override + public void onFailure(final Throwable t) { + throw new RuntimeException(t); + } }); } - for (final Future future : futures) { - future.await(); + // Wait for every future + for (final ListenableFuture> future : futures) { + try { + future.get(3, TimeUnit.MINUTES); + } catch (final TimeoutException e) { + fail("Request " + futures.indexOf(future) + " is not responding"); + } } // Give future listeners some time to finish counter incrementation @@ -123,10 +152,17 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest { } } - public NetconfClientConfiguration getClientConfiguration() throws IOException { + private NetconfMessage changeMessageId(final NetconfMessage getConfig, final int i) throws IOException, SAXException { + String s = XmlUtil.toString(getConfig.getDocument(), false); + s = s.replace("101", Integer.toString(i)); + return new NetconfMessage(XmlUtil.readXmlToDocument(s)); + } + + public NetconfClientConfiguration getClientConfiguration(final NetconfClientSessionListener sessionListener) throws IOException { final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create(); b.withAddress(TLS_ADDRESS); - b.withSessionListener(new SimpleNetconfClientSessionListener()); + // Using session listener from sal-netconf-connector since stress test cannot be performed with simple listener + b.withSessionListener(sessionListener); b.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)); b.withProtocol(NetconfClientConfiguration.NetconfClientProtocol.SSH); b.withConnectionTimeoutMillis(5000); @@ -134,6 +170,16 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest { return b.build(); } + @Mock + private RemoteDevice mockedRemoteDevice; + + private NetconfDeviceCommunicator getSessionListener() { + MockitoAnnotations.initMocks(this); + doNothing().when(mockedRemoteDevice).onRemoteSessionUp(any(NetconfSessionCapabilities.class), any(RemoteDeviceCommunicator.class)); + doNothing().when(mockedRemoteDevice).onRemoteSessionDown(); + return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test"), mockedRemoteDevice); + } + public AuthProvider getAuthProvider() throws Exception { final AuthProvider mockAuth = mock(AuthProvider.class); doReturn("mockedAuth").when(mockAuth).toString(); diff --git a/opendaylight/netconf/netconf-it/src/test/resources/logback-test.xml b/opendaylight/netconf/netconf-it/src/test/resources/logback-test.xml index c5037d34ed..91fb805e6a 100644 --- a/opendaylight/netconf/netconf-it/src/test/resources/logback-test.xml +++ b/opendaylight/netconf/netconf-it/src/test/resources/logback-test.xml @@ -7,6 +7,7 @@ + diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 369c013832..3d1e4784f2 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -328,6 +328,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { ", remote window is not getting read or is too small")); } + // We need to reset buffer read index, since we've already read it when we tried to write it the first time + ((ByteBuf) msg).resetReaderIndex(); logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter); // In case of pending, re-invoke write after pending is finished @@ -335,12 +337,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { lastWriteFuture.addListener(new SshFutureListener() { @Override public void operationComplete(final IoWriteFuture future) { + // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first + // External thread could trigger write on this instance while we are on this line + // Verify if (future.isWritten()) { synchronized (SshWriteAsyncHandler.this) { // Pending done, decrease counter pendingWriteCounter--; + write(ctx, msg, promise); } - write(ctx, msg, promise); } else { // Cannot reschedule pending, fail handlePendingFailed(ctx, e); -- 2.36.6