From: Tomas Cere Date: Wed, 13 Jan 2016 11:49:48 +0000 (+0100) Subject: BUG 4547: Make sure all channel writes are from a netty thread. X-Git-Tag: release/lithium-sr4~17 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=235f43934e553d3bc8a6ed416902bbb601f525df;p=controller.git BUG 4547: Make sure all channel writes are from a netty thread. When we were in restconf thread some writes were executed from jetty thread, we need to make sure all channel writes are coming from netty thread to preserve message ordering. Port of: https://git.opendaylight.org/gerrit/#/c/32272/5 Change-Id: I19ba295905de5fce3cb84e6d5285a2ddf73c54c1 Signed-off-by: Tomas Cere --- diff --git a/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorTest.java b/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorTest.java index eb45a4dcd4..d17fcea7bc 100644 --- a/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorTest.java +++ b/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorTest.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelProgressivePromise; +import io.netty.channel.EventLoop; import io.netty.handler.ssl.SslHandler; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; @@ -61,6 +62,7 @@ public class NetconfClientSessionNegotiatorTest { pipeline = mockChannelPipeline(); future = mockChannelFuture(); channel = mockChannel(); + mockEventLoop(); } private static ChannelHandler mockChannelHandler() { @@ -102,6 +104,20 @@ public class NetconfClientSessionNegotiatorTest { return pipeline; } + private void mockEventLoop() { + final EventLoop eventLoop = mock(EventLoop.class); + doReturn(eventLoop).when(channel).eventLoop(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final Object[] args = invocation.getArguments(); + final Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + }).when(eventLoop).execute(any(Runnable.class)); + } + private NetconfClientSessionNegotiator createNetconfClientSessionNegotiator(final Promise promise, final NetconfMessage startExi) { ChannelProgressivePromise progressivePromise = mock(ChannelProgressivePromise.class); diff --git a/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/SimpleNetconfClientSessionListenerTest.java b/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/SimpleNetconfClientSessionListenerTest.java index 8ce1ef0db5..03f982043b 100644 --- a/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/SimpleNetconfClientSessionListenerTest.java +++ b/opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/SimpleNetconfClientSessionListenerTest.java @@ -10,7 +10,9 @@ package org.opendaylight.controller.netconf.client; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -18,11 +20,15 @@ import static org.mockito.Mockito.verify; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoop; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.util.Set; import org.junit.Before; import org.junit.Test; import org.mockito.internal.util.collections.Sets; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage; @@ -40,7 +46,9 @@ public class SimpleNetconfClientSessionListenerTest { public void setUp() throws Exception { channel = mock(Channel.class); channelFuture = mock(ChannelFuture.class); + mockEventLoop(); doReturn(channelFuture).when(channel).writeAndFlush(anyObject()); + doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class)); caps = Sets.newSet("a", "b"); helloMessage = NetconfHelloMessage.createServerHello(caps, 10); message = new NetconfMessage(helloMessage.getDocument()); @@ -48,6 +56,20 @@ public class SimpleNetconfClientSessionListenerTest { clientSession = new NetconfClientSession(sessionListener, channel, 20L, caps); } + private void mockEventLoop() { + final EventLoop eventLoop = mock(EventLoop.class); + doReturn(eventLoop).when(channel).eventLoop(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final Object[] args = invocation.getArguments(); + final Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + }).when(eventLoop).execute(any(Runnable.class)); + } + @Test public void testSessionDown() throws Exception { SimpleNetconfClientSessionListener simpleListener = new SimpleNetconfClientSessionListener(); diff --git a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultCloseSessionTest.java b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultCloseSessionTest.java index 9afa76148e..6eeab8a5b4 100644 --- a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultCloseSessionTest.java +++ b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultCloseSessionTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoop; import io.netty.util.concurrent.GenericFutureListener; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -34,6 +35,21 @@ import org.w3c.dom.Document; public class DefaultCloseSessionTest { + private void mockEventLoop(final Channel channel) { + final EventLoop eventLoop = mock(EventLoop.class); + doReturn(eventLoop).when(channel).eventLoop(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final Object[] args = invocation.getArguments(); + final Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + }).when(eventLoop).execute(any(Runnable.class)); + doReturn(true).when(eventLoop).inEventLoop(); + } + @Test public void testDefaultCloseSession() throws Exception { AutoCloseable res = mock(AutoCloseable.class); @@ -43,7 +59,10 @@ public class DefaultCloseSessionTest { XmlElement elem = XmlElement.fromDomElement(XmlUtil.readXmlToElement("")); final Channel channel = mock(Channel.class); doReturn("channel").when(channel).toString(); - doReturn(mock(ChannelFuture.class)).when(channel).close(); + mockEventLoop(channel); + final ChannelFuture channelFuture = mock(ChannelFuture.class); + doReturn(channelFuture).when(channel).close(); + doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class)); final ChannelFuture sendFuture = mock(ChannelFuture.class); doAnswer(new Answer() { @@ -54,6 +73,7 @@ public class DefaultCloseSessionTest { } }).when(sendFuture).addListener(any(GenericFutureListener.class)); doReturn(sendFuture).when(channel).writeAndFlush(anyObject()); + doReturn(true).when(sendFuture).isSuccess(); final NetconfServerSessionListener listener = mock(NetconfServerSessionListener.class); doNothing().when(listener).onSessionTerminated(any(NetconfServerSession.class), any(NetconfTerminationReason.class)); final NetconfServerSession session = diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java index a59b1a0d76..df5a2c9a6c 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java @@ -10,8 +10,11 @@ package org.opendaylight.controller.netconf.nettyutil; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.DefaultChannelPromise; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import java.io.IOException; import org.opendaylight.controller.netconf.api.NetconfExiSession; import org.opendaylight.controller.netconf.api.NetconfMessage; @@ -63,13 +66,37 @@ public abstract class AbstractNetconfSession() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + proxyFuture.setSuccess(); + } else { + proxyFuture.setFailure(future.cause()); + } + } + }); + if (delayedEncoder != null) { + replaceMessageEncoder(delayedEncoder); + delayedEncoder = null; + } + } + }); + + return proxyFuture; } @Override diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java index 3a1b26dd9e..bbfc3604ba 100644 --- a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java +++ b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.netconf.nettyutil; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -24,13 +25,17 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.concurrent.GenericFutureListener; import java.util.Collections; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfSession; import org.opendaylight.controller.netconf.api.NetconfSessionListener; @@ -48,6 +53,11 @@ public class AbstractNetconfSessionTest { private Channel channel; @Mock private ChannelPipeline pipeline; + @Mock + private EventLoop eventLoop; + @Mock + private ChannelFuture writeFuture; + private NetconfHelloMessage clientHello; @Before @@ -58,13 +68,26 @@ public class AbstractNetconfSessionTest { doNothing().when(listener).onSessionDown(any(NetconfSession.class), any(Exception.class)); doNothing().when(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class)); - doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class)); + doReturn(writeFuture).when(writeFuture).addListener(any(GenericFutureListener.class)); + + doReturn(writeFuture).when(channel).writeAndFlush(any(NetconfMessage.class)); doReturn(pipeline).when(channel).pipeline(); doReturn("mockChannel").when(channel).toString(); doReturn(mock(ChannelFuture.class)).when(channel).close(); doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class)); + doReturn(eventLoop).when(channel).eventLoop(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final Object[] args = invocation.getArguments(); + final Runnable runnable = (Runnable) args[0]; + runnable.run(); + return null; + } + }).when(eventLoop).execute(any(Runnable.class)); + clientHello = NetconfHelloMessage.createClientHello(Collections.emptySet(), Optional.absent()); }