Make Netty multi-threaded in southbound 74/47774/2
authorLorand Jakab <lojakab@cisco.com>
Thu, 27 Oct 2016 04:40:06 +0000 (07:40 +0300)
committerLorand Jakab <lojakab@cisco.com>
Mon, 31 Oct 2016 11:12:08 +0000 (13:12 +0200)
Change-Id: I0790af750be50e3f04997c4157279b3d69594d83
Signed-off-by: Lorand Jakab <lojakab@cisco.com>
mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/LispSouthboundPlugin.java
mappingservice/southbound/src/test/java/org/opendaylight/lispflowmapping/southbound/LispSouthboundPluginTest.java

index 246763348c49bd4af93ce50aa243999bb914ea98..d9a5e9f7f6a8751bfd0051e0b3033fd7f585af4f 100644 (file)
@@ -16,11 +16,14 @@ import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
 import io.netty.channel.epoll.EpollDatagramChannel;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -69,7 +72,8 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
     private LispSouthboundHandler lispSouthboundHandler;
     private LispXtrSouthboundHandler lispXtrSouthboundHandler;
     private NotificationPublishService notificationPublishService;
-    private Channel channel;
+    private int numChannels = 1;
+    private Channel[] channel;
     private Channel xtrChannel;
     private Class channelType;
     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
@@ -90,6 +94,10 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
         this.notificationPublishService = notificationPublishService;
         this.clusterSingletonService = clusterSingletonService;
         this.clusterSingletonService.registerClusterSingletonService(this);
+        if (Epoll.isAvailable()) {
+            numChannels = 5;
+        }
+        channel = new Channel[numChannels];
     }
 
     public void init() {
@@ -114,8 +122,10 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
             lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
 
             if (Epoll.isAvailable()) {
-                eventLoopGroup = new EpollEventLoopGroup(0, threadFactory);
+                eventLoopGroup = new EpollEventLoopGroup(numChannels, threadFactory);
                 channelType = EpollDatagramChannel.class;
+                bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+                bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
                 LOG.debug("Using Netty Epoll for UDP sockets");
             } else {
                 eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
@@ -141,7 +151,9 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void start() {
         try {
-            channel = bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
+            for (int i = 0; i < numChannels; ++i) {
+                channel[i] = bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
+            }
             LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM);
         } catch (Exception e) {
             LOG.error("Failed to open main socket ", e);
@@ -163,8 +175,10 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void stop() {
         try {
-            channel.close().sync();
-            channel = null;
+            for (int i = 0; i < numChannels; ++i) {
+                channel[i].close().sync();
+                channel[i] = null;
+            }
         } catch (Exception e) {
             LOG.error("Failed to close main socket ", e);
         }
@@ -213,7 +227,7 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
     public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
             final MessageType packetType, final int portNumber, Channel senderChannel) {
         if (senderChannel == null) {
-            senderChannel = this.channel;
+            senderChannel = this.channel[0];
         }
         InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
         outBuffer.position(0);
index 0e3d790e5a6b93d4887f08f08eddda9e786c3d3e..e77df27fe6b4c0f06a9a448ce66f4056f96caf52 100644 (file)
@@ -12,6 +12,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.DatagramPacket;
@@ -203,13 +204,14 @@ public class LispSouthboundPluginTest {
         Mockito.verify(handlerMock).close();
         assertNull(getField("lispSouthboundHandler"));
         assertNull(getField("lispXtrSouthboundHandler"));
-        assertNull(getField("channel"));
+        Channel[] channel = getField("channel");
+        assertNull(channel[0]);
     }
 
     private static void injectChannel() throws NoSuchFieldException, IllegalAccessException {
         final Field channelField = LispSouthboundPlugin.class.getDeclaredField("channel");
         channelField.setAccessible(true);
-        channelField.set(lispSouthboundPlugin, channel);
+        channelField.set(lispSouthboundPlugin, new Channel[] { channel });
     }
 
     private static void injectXtrChannel() throws NoSuchFieldException, IllegalAccessException {