Merge "BUG 4547: Make sure all channel writes are from a netty thread."
authorMaros Marsalek <mmarsale@cisco.com>
Wed, 13 Jan 2016 10:13:55 +0000 (10:13 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 13 Jan 2016 10:13:55 +0000 (10:13 +0000)
opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/NetconfClientSessionNegotiatorTest.java
opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/SimpleNetconfClientSessionListenerTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/mapping/operations/DefaultCloseSessionTest.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionTest.java

index 7cda015bd417d449579dd1d2efcf6f79a167a66b..e3006c7bb1c8ebce4b6f5d891814d46624041178 100644 (file)
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.EventLoop;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
@@ -36,6 +37,8 @@ import org.junit.Test;
 import org.mockito.internal.util.collections.Sets;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.netconf.api.NetconfClientSessionPreferences;
+import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.nettyutil.handler.ChunkedFramingMechanismEncoder;
 import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder;
 import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
@@ -43,8 +46,6 @@ import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
 import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
 import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
 import org.opendaylight.netconf.util.test.XmlFileLoader;
-import org.opendaylight.netconf.api.NetconfClientSessionPreferences;
-import org.opendaylight.netconf.api.NetconfMessage;
 import org.openexi.proc.common.EXIOptions;
 import org.w3c.dom.Document;
 
@@ -62,6 +63,7 @@ public class NetconfClientSessionNegotiatorTest {
         pipeline = mockChannelPipeline();
         future = mockChannelFuture();
         channel = mockChannel();
+        mockEventLoop();
     }
 
     private static ChannelHandler mockChannelHandler() {
@@ -103,6 +105,20 @@ public class NetconfClientSessionNegotiatorTest {
         return pipeline;
     }
 
+    private void mockEventLoop() {
+        final EventLoop eventLoop = mock(EventLoop.class);
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+    }
+
     private NetconfClientSessionNegotiator createNetconfClientSessionNegotiator(final Promise<NetconfClientSession> promise,
                                                                                 final NetconfMessage startExi) {
         ChannelProgressivePromise progressivePromise = mock(ChannelProgressivePromise.class);
index f13d810fad0b4aa33623fa07bdfbcdd9ef8d3d93..0bc9b04d8ce337e082fedc4f7e488afa3baa74fc 100644 (file)
@@ -10,7 +10,9 @@ package org.opendaylight.netconf.client;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -18,11 +20,15 @@ import static org.mockito.Mockito.verify;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoop;
 import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.util.Set;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.collections.Sets;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
 import org.opendaylight.netconf.api.NetconfMessage;
 
@@ -40,7 +46,9 @@ public class SimpleNetconfClientSessionListenerTest {
     public void setUp() throws Exception {
         channel = mock(Channel.class);
         channelFuture = mock(ChannelFuture.class);
+        mockEventLoop();
         doReturn(channelFuture).when(channel).writeAndFlush(anyObject());
+        doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class));
         caps = Sets.newSet("a", "b");
         helloMessage = NetconfHelloMessage.createServerHello(caps, 10);
         message = new NetconfMessage(helloMessage.getDocument());
@@ -48,6 +56,20 @@ public class SimpleNetconfClientSessionListenerTest {
         clientSession = new NetconfClientSession(sessionListener, channel, 20L, caps);
     }
 
+    private void mockEventLoop() {
+        final EventLoop eventLoop = mock(EventLoop.class);
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+    }
+
     @Test
     public void testSessionDown() throws Exception {
         SimpleNetconfClientSessionListener simpleListener = new SimpleNetconfClientSessionListener();
index c999951a6cd90c5b5010deb0f79da0a1445505e6..f50fe52125ed8f672f0d8408345f77b004c8e1bb 100644 (file)
@@ -19,6 +19,7 @@ import static org.mockito.Mockito.verify;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoop;
 import io.netty.util.concurrent.GenericFutureListener;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -36,6 +37,21 @@ import org.w3c.dom.Document;
 
 public class DefaultCloseSessionTest {
 
+    private void mockEventLoop(final Channel channel) {
+        final EventLoop eventLoop = mock(EventLoop.class);
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+        doReturn(true).when(eventLoop).inEventLoop();
+    }
+
     @Test
     public void testDefaultCloseSession() throws Exception {
         AutoCloseable res = mock(AutoCloseable.class);
@@ -45,7 +61,10 @@ public class DefaultCloseSessionTest {
         XmlElement elem = XmlElement.fromDomElement(XmlUtil.readXmlToElement("<elem/>"));
         final Channel channel = mock(Channel.class);
         doReturn("channel").when(channel).toString();
-        doReturn(mock(ChannelFuture.class)).when(channel).close();
+        mockEventLoop(channel);
+        final ChannelFuture channelFuture = mock(ChannelFuture.class);
+        doReturn(channelFuture).when(channel).close();
+        doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class));
 
         final ChannelFuture sendFuture = mock(ChannelFuture.class);
         doAnswer(new Answer<Object>() {
@@ -56,6 +75,7 @@ public class DefaultCloseSessionTest {
             }
         }).when(sendFuture).addListener(any(GenericFutureListener.class));
         doReturn(sendFuture).when(channel).writeAndFlush(anyObject());
+        doReturn(true).when(sendFuture).isSuccess();
         final NetconfServerSessionListener listener = mock(NetconfServerSessionListener.class);
         doNothing().when(listener).onSessionTerminated(any(NetconfServerSession.class), any(NetconfTerminationReason.class));
         final NetconfServerSession session =
index eb66d2902047bf9fb2a1f858e18323059f6e8e18..e4c03ed50f43b518622957e179715e87f8f36814 100644 (file)
@@ -10,8 +10,11 @@ package org.opendaylight.netconf.nettyutil;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelPromise;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
 import java.io.IOException;
 import org.opendaylight.controller.config.util.xml.XmlElement;
 import org.opendaylight.netconf.api.NetconfExiSession;
@@ -63,13 +66,37 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
 
     @Override
     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
-        final ChannelFuture future = channel.writeAndFlush(netconfMessage);
-        if (delayedEncoder != null) {
-            replaceMessageEncoder(delayedEncoder);
-            delayedEncoder = null;
-        }
-
-        return future;
+        // From: https://github.com/netty/netty/issues/3887
+        // Netty can provide "ordering" in the following situations:
+        // 1. You are doing all writes from the EventLoop thread; OR
+        // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
+        //
+        // Restconf writes to a netconf mountpoint execute multiple messages
+        // and one of these was executed from a restconf thread thus breaking ordering so
+        // we need to execute all messages from an EventLoop thread.
+        final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
+        channel.eventLoop().execute(new Runnable() {
+            @Override
+            public void run() {
+                final ChannelFuture future = channel.writeAndFlush(netconfMessage);
+                future.addListener(new FutureListener<Void>() {
+                    @Override
+                    public void operationComplete(Future<Void> future) throws Exception {
+                        if (future.isSuccess()) {
+                            proxyFuture.setSuccess();
+                        } else {
+                            proxyFuture.setFailure(future.cause());
+                        }
+                    }
+                });
+                if (delayedEncoder != null) {
+                    replaceMessageEncoder(delayedEncoder);
+                    delayedEncoder = null;
+                }
+            }
+        });
+
+        return proxyFuture;
     }
 
     @Override
index e0d6939d233a962f96eb15a2d0a3b6d10ddc3fcb..b55e861ed34fadb94692c40cfdb39346052387bc 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.netconf.nettyutil;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -25,20 +26,24 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoop;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
-import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.NetconfSession;
 import org.opendaylight.netconf.api.NetconfSessionListener;
 import org.opendaylight.netconf.api.NetconfTerminationReason;
 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
+import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
 import org.openexi.proc.common.EXIOptions;
 
 public class AbstractNetconfSessionTest {
@@ -49,6 +54,11 @@ public class AbstractNetconfSessionTest {
     private Channel channel;
     @Mock
     private ChannelPipeline pipeline;
+    @Mock
+    private EventLoop eventLoop;
+    @Mock
+    private ChannelFuture writeFuture;
+
     private NetconfHelloMessage clientHello;
 
     @Before
@@ -59,13 +69,26 @@ public class AbstractNetconfSessionTest {
         doNothing().when(listener).onSessionDown(any(NetconfSession.class), any(Exception.class));
         doNothing().when(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class));
 
-        doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class));
+        doReturn(writeFuture).when(writeFuture).addListener(any(GenericFutureListener.class));
+
+        doReturn(writeFuture).when(channel).writeAndFlush(any(NetconfMessage.class));
         doReturn(pipeline).when(channel).pipeline();
         doReturn("mockChannel").when(channel).toString();
         doReturn(mock(ChannelFuture.class)).when(channel).close();
 
         doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class));
 
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+
         clientHello = NetconfHelloMessage.createClientHello(Collections.<String>emptySet(), Optional.<NetconfHelloMessageAdditionalHeader>absent());
     }