From: Tomas Cere Date: Fri, 8 Jan 2016 12:09:02 +0000 (+0100) Subject: BUG 4547: Make sure all channel writes are from a netty thread. X-Git-Tag: release/beryllium~33^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=99db40a9ad885904a619cb77b2b541a8292b54bd;p=netconf.git BUG 4547: Make sure all channel writes are from a netty thread. When we were in restconf thread some writes were executed from jetty thread, we need to make sure all channel writes are coming from netty thread to preserve message ordering. Change-Id: I6ca8dda82c11338b1ff590c808f12fa0def9ca33 Signed-off-by: Tomas Cere --- diff --git a/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/NetconfClientSessionNegotiatorTest.java b/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/NetconfClientSessionNegotiatorTest.java index 7cda015bd4..e3006c7bb1 100644 --- a/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/NetconfClientSessionNegotiatorTest.java +++ b/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/NetconfClientSessionNegotiatorTest.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelProgressivePromise; +import io.netty.channel.EventLoop; import io.netty.handler.ssl.SslHandler; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; @@ -36,6 +37,8 @@ import org.junit.Test; import org.mockito.internal.util.collections.Sets; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opendaylight.netconf.api.NetconfClientSessionPreferences; +import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.nettyutil.handler.ChunkedFramingMechanismEncoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder; @@ -43,8 +46,6 @@ import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage; import org.opendaylight.netconf.util.messages.NetconfHelloMessage; import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader; import org.opendaylight.netconf.util.test.XmlFileLoader; -import org.opendaylight.netconf.api.NetconfClientSessionPreferences; -import org.opendaylight.netconf.api.NetconfMessage; import org.openexi.proc.common.EXIOptions; import org.w3c.dom.Document; @@ -62,6 +63,7 @@ public class NetconfClientSessionNegotiatorTest { pipeline = mockChannelPipeline(); future = mockChannelFuture(); channel = mockChannel(); + mockEventLoop(); } private static ChannelHandler mockChannelHandler() { @@ -103,6 +105,20 @@ public class NetconfClientSessionNegotiatorTest { return pipeline; } + private void mockEventLoop() { + final EventLoop eventLoop = mock(EventLoop.class); + doReturn(eventLoop).when(channel).eventLoop(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final Object[] args = invocation.getArguments(); + final Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + }).when(eventLoop).execute(any(Runnable.class)); + } + private NetconfClientSessionNegotiator createNetconfClientSessionNegotiator(final Promise promise, final NetconfMessage startExi) { ChannelProgressivePromise progressivePromise = mock(ChannelProgressivePromise.class); diff --git a/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/SimpleNetconfClientSessionListenerTest.java b/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/SimpleNetconfClientSessionListenerTest.java index f13d810fad..0bc9b04d8c 100644 --- a/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/SimpleNetconfClientSessionListenerTest.java +++ b/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/SimpleNetconfClientSessionListenerTest.java @@ -10,7 +10,9 @@ package org.opendaylight.netconf.client; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -18,11 +20,15 @@ import static org.mockito.Mockito.verify; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoop; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.util.Set; import org.junit.Before; import org.junit.Test; import org.mockito.internal.util.collections.Sets; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.opendaylight.netconf.util.messages.NetconfHelloMessage; import org.opendaylight.netconf.api.NetconfMessage; @@ -40,7 +46,9 @@ public class SimpleNetconfClientSessionListenerTest { public void setUp() throws Exception { channel = mock(Channel.class); channelFuture = mock(ChannelFuture.class); + mockEventLoop(); doReturn(channelFuture).when(channel).writeAndFlush(anyObject()); + doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class)); caps = Sets.newSet("a", "b"); helloMessage = NetconfHelloMessage.createServerHello(caps, 10); message = new NetconfMessage(helloMessage.getDocument()); @@ -48,6 +56,20 @@ public class SimpleNetconfClientSessionListenerTest { clientSession = new NetconfClientSession(sessionListener, channel, 20L, caps); } + private void mockEventLoop() { + final EventLoop eventLoop = mock(EventLoop.class); + doReturn(eventLoop).when(channel).eventLoop(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final Object[] args = invocation.getArguments(); + final Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + }).when(eventLoop).execute(any(Runnable.class)); + } + @Test public void testSessionDown() throws Exception { SimpleNetconfClientSessionListener simpleListener = new SimpleNetconfClientSessionListener(); diff --git a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/mapping/operations/DefaultCloseSessionTest.java b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/mapping/operations/DefaultCloseSessionTest.java index c999951a6c..f50fe52125 100644 --- a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/mapping/operations/DefaultCloseSessionTest.java +++ b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/mapping/operations/DefaultCloseSessionTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.verify; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoop; import io.netty.util.concurrent.GenericFutureListener; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -36,6 +37,21 @@ import org.w3c.dom.Document; public class DefaultCloseSessionTest { + private void mockEventLoop(final Channel channel) { + final EventLoop eventLoop = mock(EventLoop.class); + doReturn(eventLoop).when(channel).eventLoop(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final Object[] args = invocation.getArguments(); + final Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + }).when(eventLoop).execute(any(Runnable.class)); + doReturn(true).when(eventLoop).inEventLoop(); + } + @Test public void testDefaultCloseSession() throws Exception { AutoCloseable res = mock(AutoCloseable.class); @@ -45,7 +61,10 @@ public class DefaultCloseSessionTest { XmlElement elem = XmlElement.fromDomElement(XmlUtil.readXmlToElement("")); final Channel channel = mock(Channel.class); doReturn("channel").when(channel).toString(); - doReturn(mock(ChannelFuture.class)).when(channel).close(); + mockEventLoop(channel); + final ChannelFuture channelFuture = mock(ChannelFuture.class); + doReturn(channelFuture).when(channel).close(); + doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class)); final ChannelFuture sendFuture = mock(ChannelFuture.class); doAnswer(new Answer() { @@ -56,6 +75,7 @@ public class DefaultCloseSessionTest { } }).when(sendFuture).addListener(any(GenericFutureListener.class)); doReturn(sendFuture).when(channel).writeAndFlush(anyObject()); + doReturn(true).when(sendFuture).isSuccess(); final NetconfServerSessionListener listener = mock(NetconfServerSessionListener.class); doNothing().when(listener).onSessionTerminated(any(NetconfServerSession.class), any(NetconfTerminationReason.class)); final NetconfServerSession session = diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java index eb66d29020..e4c03ed50f 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java @@ -10,8 +10,11 @@ package org.opendaylight.netconf.nettyutil; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.DefaultChannelPromise; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import java.io.IOException; import org.opendaylight.controller.config.util.xml.XmlElement; import org.opendaylight.netconf.api.NetconfExiSession; @@ -63,13 +66,37 @@ public abstract class AbstractNetconfSession() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + proxyFuture.setSuccess(); + } else { + proxyFuture.setFailure(future.cause()); + } + } + }); + if (delayedEncoder != null) { + replaceMessageEncoder(delayedEncoder); + delayedEncoder = null; + } + } + }); + + return proxyFuture; } @Override diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionTest.java index e0d6939d23..b55e861ed3 100644 --- a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionTest.java +++ b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionTest.java @@ -11,6 +11,7 @@ package org.opendaylight.netconf.nettyutil; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -25,20 +26,24 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.concurrent.GenericFutureListener; import java.util.Collections; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opendaylight.netconf.util.messages.NetconfHelloMessage; -import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfSession; import org.opendaylight.netconf.api.NetconfSessionListener; import org.opendaylight.netconf.api.NetconfTerminationReason; import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage; +import org.opendaylight.netconf.util.messages.NetconfHelloMessage; +import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader; import org.openexi.proc.common.EXIOptions; public class AbstractNetconfSessionTest { @@ -49,6 +54,11 @@ public class AbstractNetconfSessionTest { private Channel channel; @Mock private ChannelPipeline pipeline; + @Mock + private EventLoop eventLoop; + @Mock + private ChannelFuture writeFuture; + private NetconfHelloMessage clientHello; @Before @@ -59,13 +69,26 @@ public class AbstractNetconfSessionTest { doNothing().when(listener).onSessionDown(any(NetconfSession.class), any(Exception.class)); doNothing().when(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class)); - doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class)); + doReturn(writeFuture).when(writeFuture).addListener(any(GenericFutureListener.class)); + + doReturn(writeFuture).when(channel).writeAndFlush(any(NetconfMessage.class)); doReturn(pipeline).when(channel).pipeline(); doReturn("mockChannel").when(channel).toString(); doReturn(mock(ChannelFuture.class)).when(channel).close(); doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class)); + doReturn(eventLoop).when(channel).eventLoop(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final Object[] args = invocation.getArguments(); + final Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + }).when(eventLoop).execute(any(Runnable.class)); + clientHello = NetconfHelloMessage.createClientHello(Collections.emptySet(), Optional.absent()); }