BUG 4547: Make sure all channel writes are from a netty thread.
[netconf.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSession.java
index eb66d2902047bf9fb2a1f858e18323059f6e8e18..e4c03ed50f43b518622957e179715e87f8f36814 100644 (file)
@@ -10,8 +10,11 @@ package org.opendaylight.netconf.nettyutil;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelPromise;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
 import java.io.IOException;
 import org.opendaylight.controller.config.util.xml.XmlElement;
 import org.opendaylight.netconf.api.NetconfExiSession;
@@ -63,13 +66,37 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
 
     @Override
     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
-        final ChannelFuture future = channel.writeAndFlush(netconfMessage);
-        if (delayedEncoder != null) {
-            replaceMessageEncoder(delayedEncoder);
-            delayedEncoder = null;
-        }
-
-        return future;
+        // From: https://github.com/netty/netty/issues/3887
+        // Netty can provide "ordering" in the following situations:
+        // 1. You are doing all writes from the EventLoop thread; OR
+        // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
+        //
+        // Restconf writes to a netconf mountpoint execute multiple messages
+        // and one of these was executed from a restconf thread thus breaking ordering so
+        // we need to execute all messages from an EventLoop thread.
+        final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
+        channel.eventLoop().execute(new Runnable() {
+            @Override
+            public void run() {
+                final ChannelFuture future = channel.writeAndFlush(netconfMessage);
+                future.addListener(new FutureListener<Void>() {
+                    @Override
+                    public void operationComplete(Future<Void> future) throws Exception {
+                        if (future.isSuccess()) {
+                            proxyFuture.setSuccess();
+                        } else {
+                            proxyFuture.setFailure(future.cause());
+                        }
+                    }
+                });
+                if (delayedEncoder != null) {
+                    replaceMessageEncoder(delayedEncoder);
+                    delayedEncoder = null;
+                }
+            }
+        });
+
+        return proxyFuture;
     }
 
     @Override