BUG 4547: Make sure all channel writes are from a netty thread. 92/32492/2
authorTomas Cere <tcere@cisco.com>
Wed, 13 Jan 2016 11:49:48 +0000 (12:49 +0100)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 19 Jan 2016 08:41:20 +0000 (08:41 +0000)
When we were in restconf thread some writes were executed from jetty thread,
we need to make sure all channel writes are coming from netty thread to
preserve message ordering.

Port of:
https://git.opendaylight.org/gerrit/#/c/32272/5

Change-Id: I19ba295905de5fce3cb84e6d5285a2ddf73c54c1
Signed-off-by: Tomas Cere <tcere@cisco.com>
opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorTest.java
opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/SimpleNetconfClientSessionListenerTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultCloseSessionTest.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java

index eb45a4dcd4b8075dbbdbeb7ce04709371d328407..d17fcea7bcbff6c02389a03e21d01f0f5b97336b 100644 (file)
@@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.EventLoop;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
@@ -61,6 +62,7 @@ public class NetconfClientSessionNegotiatorTest {
         pipeline = mockChannelPipeline();
         future = mockChannelFuture();
         channel = mockChannel();
+        mockEventLoop();
     }
 
     private static ChannelHandler mockChannelHandler() {
@@ -102,6 +104,20 @@ public class NetconfClientSessionNegotiatorTest {
         return pipeline;
     }
 
+    private void mockEventLoop() {
+        final EventLoop eventLoop = mock(EventLoop.class);
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+    }
+
     private NetconfClientSessionNegotiator createNetconfClientSessionNegotiator(final Promise<NetconfClientSession> promise,
                                                                                 final NetconfMessage startExi) {
         ChannelProgressivePromise progressivePromise = mock(ChannelProgressivePromise.class);
index 8ce1ef0db501c1986d8f54d410ab305af4c7423e..03f982043b2871528028d8a4900aed0bfca84d8f 100644 (file)
@@ -10,7 +10,9 @@ package org.opendaylight.controller.netconf.client;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -18,11 +20,15 @@ import static org.mockito.Mockito.verify;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoop;
 import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.util.Set;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.collections.Sets;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
 
@@ -40,7 +46,9 @@ public class SimpleNetconfClientSessionListenerTest {
     public void setUp() throws Exception {
         channel = mock(Channel.class);
         channelFuture = mock(ChannelFuture.class);
+        mockEventLoop();
         doReturn(channelFuture).when(channel).writeAndFlush(anyObject());
+        doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class));
         caps = Sets.newSet("a", "b");
         helloMessage = NetconfHelloMessage.createServerHello(caps, 10);
         message = new NetconfMessage(helloMessage.getDocument());
@@ -48,6 +56,20 @@ public class SimpleNetconfClientSessionListenerTest {
         clientSession = new NetconfClientSession(sessionListener, channel, 20L, caps);
     }
 
+    private void mockEventLoop() {
+        final EventLoop eventLoop = mock(EventLoop.class);
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+    }
+
     @Test
     public void testSessionDown() throws Exception {
         SimpleNetconfClientSessionListener simpleListener = new SimpleNetconfClientSessionListener();
index 9afa76148e31967abd9dd041775c921b4f2ad742..6eeab8a5b499e438174da61573d686cc379c1c4c 100644 (file)
@@ -18,6 +18,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoop;
 import io.netty.util.concurrent.GenericFutureListener;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -34,6 +35,21 @@ import org.w3c.dom.Document;
 
 public class DefaultCloseSessionTest {
 
+    private void mockEventLoop(final Channel channel) {
+        final EventLoop eventLoop = mock(EventLoop.class);
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+        doReturn(true).when(eventLoop).inEventLoop();
+    }
+
     @Test
     public void testDefaultCloseSession() throws Exception {
         AutoCloseable res = mock(AutoCloseable.class);
@@ -43,7 +59,10 @@ public class DefaultCloseSessionTest {
         XmlElement elem = XmlElement.fromDomElement(XmlUtil.readXmlToElement("<elem/>"));
         final Channel channel = mock(Channel.class);
         doReturn("channel").when(channel).toString();
-        doReturn(mock(ChannelFuture.class)).when(channel).close();
+        mockEventLoop(channel);
+        final ChannelFuture channelFuture = mock(ChannelFuture.class);
+        doReturn(channelFuture).when(channel).close();
+        doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class));
 
         final ChannelFuture sendFuture = mock(ChannelFuture.class);
         doAnswer(new Answer<Object>() {
@@ -54,6 +73,7 @@ public class DefaultCloseSessionTest {
             }
         }).when(sendFuture).addListener(any(GenericFutureListener.class));
         doReturn(sendFuture).when(channel).writeAndFlush(anyObject());
+        doReturn(true).when(sendFuture).isSuccess();
         final NetconfServerSessionListener listener = mock(NetconfServerSessionListener.class);
         doNothing().when(listener).onSessionTerminated(any(NetconfServerSession.class), any(NetconfTerminationReason.class));
         final NetconfServerSession session =
index a59b1a0d76bcdb01095e7c25e76377cd83ad019a..df5a2c9a6cdc04ab21dfd135381e8876f2b7a020 100644 (file)
@@ -10,8 +10,11 @@ package org.opendaylight.controller.netconf.nettyutil;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelPromise;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
 import java.io.IOException;
 import org.opendaylight.controller.netconf.api.NetconfExiSession;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
@@ -63,13 +66,37 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
 
     @Override
     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
-        final ChannelFuture future = channel.writeAndFlush(netconfMessage);
-        if (delayedEncoder != null) {
-            replaceMessageEncoder(delayedEncoder);
-            delayedEncoder = null;
-        }
-
-        return future;
+        // From: https://github.com/netty/netty/issues/3887
+        // Netty can provide "ordering" in the following situations:
+        // 1. You are doing all writes from the EventLoop thread; OR
+        // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
+        //
+        // Restconf writes to a netconf mountpoint execute multiple messages
+        // and one of these was executed from a restconf thread thus breaking ordering so
+        // we need to execute all messages from an EventLoop thread.
+        final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
+        channel.eventLoop().execute(new Runnable() {
+            @Override
+            public void run() {
+                final ChannelFuture future = channel.writeAndFlush(netconfMessage);
+                future.addListener(new FutureListener<Void>() {
+                    @Override
+                    public void operationComplete(Future<Void> future) throws Exception {
+                        if (future.isSuccess()) {
+                            proxyFuture.setSuccess();
+                        } else {
+                            proxyFuture.setFailure(future.cause());
+                        }
+                    }
+                });
+                if (delayedEncoder != null) {
+                    replaceMessageEncoder(delayedEncoder);
+                    delayedEncoder = null;
+                }
+            }
+        });
+
+        return proxyFuture;
     }
 
     @Override
index 3a1b26dd9e4dc0c44fedc890a78b9586c6512e29..bbfc3604bad7df1aefa7a0e439ae36b412cf704b 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.netconf.nettyutil;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -24,13 +25,17 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoop;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.NetconfSession;
 import org.opendaylight.controller.netconf.api.NetconfSessionListener;
@@ -48,6 +53,11 @@ public class AbstractNetconfSessionTest {
     private Channel channel;
     @Mock
     private ChannelPipeline pipeline;
+    @Mock
+    private EventLoop eventLoop;
+    @Mock
+    private ChannelFuture writeFuture;
+
     private NetconfHelloMessage clientHello;
 
     @Before
@@ -58,13 +68,26 @@ public class AbstractNetconfSessionTest {
         doNothing().when(listener).onSessionDown(any(NetconfSession.class), any(Exception.class));
         doNothing().when(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class));
 
-        doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class));
+        doReturn(writeFuture).when(writeFuture).addListener(any(GenericFutureListener.class));
+
+        doReturn(writeFuture).when(channel).writeAndFlush(any(NetconfMessage.class));
         doReturn(pipeline).when(channel).pipeline();
         doReturn("mockChannel").when(channel).toString();
         doReturn(mock(ChannelFuture.class)).when(channel).close();
 
         doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class));
 
+        doReturn(eventLoop).when(channel).eventLoop();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                final Object[] args = invocation.getArguments();
+                final Runnable runnable = (Runnable) args[0];
+                runnable.run();
+                return null;
+            }
+        }).when(eventLoop).execute(any(Runnable.class));
+
         clientHello = NetconfHelloMessage.createClientHello(Collections.<String>emptySet(), Optional.<NetconfHelloMessageAdditionalHeader>absent());
     }