BUG 4547: Make sure all channel writes are from a netty thread. 72/32272/5
authorTomas Cere <tcere@cisco.com>
Fri, 8 Jan 2016 12:09:02 +0000 (13:09 +0100)
committerTomas Cere <tcere@cisco.com>
Mon, 11 Jan 2016 09:47:37 +0000 (10:47 +0100)
When we were in restconf thread some writes were executed from jetty thread,
we need to make sure all channel writes are coming from netty thread to
preserve message ordering.

Change-Id: I6ca8dda82c11338b1ff590c808f12fa0def9ca33
Signed-off-by: Tomas Cere <tcere@cisco.com>
opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/NetconfClientSessionNegotiatorTest.java
opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/SimpleNetconfClientSessionListenerTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/netconf/impl/mapping/operations/DefaultCloseSessionTest.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionTest.java

index 7cda015bd417d449579dd1d2efcf6f79a167a66b..e3006c7bb1c8ebce4b6f5d891814d46624041178 100644 (file)
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.EventLoop;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
@@ -36,6 +37,8 @@ import org.junit.Test;
 import org.mockito.internal.util.collections.Sets;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.netconf.api.NetconfClientSessionPreferences;
+import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.nettyutil.handler.ChunkedFramingMechanismEncoder;
 import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder;
 import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
@@ -43,8 +46,6 @@ import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
 import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
 import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
 import org.opendaylight.netconf.util.test.XmlFileLoader;
-import org.opendaylight.netconf.api.NetconfClientSessionPreferences;
-import org.opendaylight.netconf.api.NetconfMessage;
 import org.openexi.proc.common.EXIOptions;
 import org.w3c.dom.Document;
 
@@ -62,6 +63,7 @@ public class NetconfClientSessionNegotiatorTest {
         pipeline = mockChannelPipeline();
         future = mockChannelFuture();
         channel = mockChannel();
+        mockEventLoop();
     }
 
     private static ChannelHandler mockChannelHandler() {
@@ -103,6 +105,20 @@ public class NetconfClientSessionNegotiatorTest {
         return pipeline;
     }
 
+    private void mockEventLoop() {
+        final EventLoop eventLoop = mock(EventLoop.class);
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+    }
+
     private NetconfClientSessionNegotiator createNetconfClientSessionNegotiator(final Promise<NetconfClientSession> promise,
                                                                                 final NetconfMessage startExi) {
         ChannelProgressivePromise progressivePromise = mock(ChannelProgressivePromise.class);
index f13d810fad0b4aa33623fa07bdfbcdd9ef8d3d93..0bc9b04d8ce337e082fedc4f7e488afa3baa74fc 100644 (file)
@@ -10,7 +10,9 @@ package org.opendaylight.netconf.client;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -18,11 +20,15 @@ import static org.mockito.Mockito.verify;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoop;
 import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.util.Set;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.collections.Sets;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
 import org.opendaylight.netconf.api.NetconfMessage;
 
@@ -40,7 +46,9 @@ public class SimpleNetconfClientSessionListenerTest {
     public void setUp() throws Exception {
         channel = mock(Channel.class);
         channelFuture = mock(ChannelFuture.class);
+        mockEventLoop();
         doReturn(channelFuture).when(channel).writeAndFlush(anyObject());
+        doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class));
         caps = Sets.newSet("a", "b");
         helloMessage = NetconfHelloMessage.createServerHello(caps, 10);
         message = new NetconfMessage(helloMessage.getDocument());
@@ -48,6 +56,20 @@ public class SimpleNetconfClientSessionListenerTest {
         clientSession = new NetconfClientSession(sessionListener, channel, 20L, caps);
     }
 
+    private void mockEventLoop() {
+        final EventLoop eventLoop = mock(EventLoop.class);
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+    }
+
     @Test
     public void testSessionDown() throws Exception {
         SimpleNetconfClientSessionListener simpleListener = new SimpleNetconfClientSessionListener();
index c999951a6cd90c5b5010deb0f79da0a1445505e6..f50fe52125ed8f672f0d8408345f77b004c8e1bb 100644 (file)
@@ -19,6 +19,7 @@ import static org.mockito.Mockito.verify;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoop;
 import io.netty.util.concurrent.GenericFutureListener;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -36,6 +37,21 @@ import org.w3c.dom.Document;
 
 public class DefaultCloseSessionTest {
 
+    private void mockEventLoop(final Channel channel) {
+        final EventLoop eventLoop = mock(EventLoop.class);
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+        doReturn(true).when(eventLoop).inEventLoop();
+    }
+
     @Test
     public void testDefaultCloseSession() throws Exception {
         AutoCloseable res = mock(AutoCloseable.class);
@@ -45,7 +61,10 @@ public class DefaultCloseSessionTest {
         XmlElement elem = XmlElement.fromDomElement(XmlUtil.readXmlToElement("<elem/>"));
         final Channel channel = mock(Channel.class);
         doReturn("channel").when(channel).toString();
-        doReturn(mock(ChannelFuture.class)).when(channel).close();
+        mockEventLoop(channel);
+        final ChannelFuture channelFuture = mock(ChannelFuture.class);
+        doReturn(channelFuture).when(channel).close();
+        doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class));
 
         final ChannelFuture sendFuture = mock(ChannelFuture.class);
         doAnswer(new Answer<Object>() {
@@ -56,6 +75,7 @@ public class DefaultCloseSessionTest {
             }
         }).when(sendFuture).addListener(any(GenericFutureListener.class));
         doReturn(sendFuture).when(channel).writeAndFlush(anyObject());
+        doReturn(true).when(sendFuture).isSuccess();
         final NetconfServerSessionListener listener = mock(NetconfServerSessionListener.class);
         doNothing().when(listener).onSessionTerminated(any(NetconfServerSession.class), any(NetconfTerminationReason.class));
         final NetconfServerSession session =
index eb66d2902047bf9fb2a1f858e18323059f6e8e18..e4c03ed50f43b518622957e179715e87f8f36814 100644 (file)
@@ -10,8 +10,11 @@ package org.opendaylight.netconf.nettyutil;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelPromise;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
 import java.io.IOException;
 import org.opendaylight.controller.config.util.xml.XmlElement;
 import org.opendaylight.netconf.api.NetconfExiSession;
@@ -63,13 +66,37 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
 
     @Override
     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
-        final ChannelFuture future = channel.writeAndFlush(netconfMessage);
-        if (delayedEncoder != null) {
-            replaceMessageEncoder(delayedEncoder);
-            delayedEncoder = null;
-        }
-
-        return future;
+        // From: https://github.com/netty/netty/issues/3887
+        // Netty can provide "ordering" in the following situations:
+        // 1. You are doing all writes from the EventLoop thread; OR
+        // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
+        //
+        // Restconf writes to a netconf mountpoint execute multiple messages
+        // and one of these was executed from a restconf thread thus breaking ordering so
+        // we need to execute all messages from an EventLoop thread.
+        final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
+        channel.eventLoop().execute(new Runnable() {
+            @Override
+            public void run() {
+                final ChannelFuture future = channel.writeAndFlush(netconfMessage);
+                future.addListener(new FutureListener<Void>() {
+                    @Override
+                    public void operationComplete(Future<Void> future) throws Exception {
+                        if (future.isSuccess()) {
+                            proxyFuture.setSuccess();
+                        } else {
+                            proxyFuture.setFailure(future.cause());
+                        }
+                    }
+                });
+                if (delayedEncoder != null) {
+                    replaceMessageEncoder(delayedEncoder);
+                    delayedEncoder = null;
+                }
+            }
+        });
+
+        return proxyFuture;
     }
 
     @Override
index e0d6939d233a962f96eb15a2d0a3b6d10ddc3fcb..b55e861ed34fadb94692c40cfdb39346052387bc 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.netconf.nettyutil;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -25,20 +26,24 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoop;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
-import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.NetconfSession;
 import org.opendaylight.netconf.api.NetconfSessionListener;
 import org.opendaylight.netconf.api.NetconfTerminationReason;
 import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
+import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
 import org.openexi.proc.common.EXIOptions;
 
 public class AbstractNetconfSessionTest {
@@ -49,6 +54,11 @@ public class AbstractNetconfSessionTest {
     private Channel channel;
     @Mock
     private ChannelPipeline pipeline;
+    @Mock
+    private EventLoop eventLoop;
+    @Mock
+    private ChannelFuture writeFuture;
+
     private NetconfHelloMessage clientHello;
 
     @Before
@@ -59,13 +69,26 @@ public class AbstractNetconfSessionTest {
         doNothing().when(listener).onSessionDown(any(NetconfSession.class), any(Exception.class));
         doNothing().when(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class));
 
-        doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class));
+        doReturn(writeFuture).when(writeFuture).addListener(any(GenericFutureListener.class));
+
+        doReturn(writeFuture).when(channel).writeAndFlush(any(NetconfMessage.class));
         doReturn(pipeline).when(channel).pipeline();
         doReturn("mockChannel").when(channel).toString();
         doReturn(mock(ChannelFuture.class)).when(channel).close();
 
         doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class));
 
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+
         clientHello = NetconfHelloMessage.createClientHello(Collections.<String>emptySet(), Optional.<NetconfHelloMessageAdditionalHeader>absent());
     }