Bug 3230 - Attempt to use Epoll native transport if available 92/36792/2
authorDileep <dileep.ranganathan@intel.com>
Mon, 28 Mar 2016 14:49:12 +0000 (07:49 -0700)
committerDileep Ranganathan <dileep.ranganathan@intel.com>
Thu, 14 Apr 2016 17:00:37 +0000 (17:00 +0000)
Attempts to use Epoll native transport if available.
Uses Epoll.isAvailable() to check availability.
Refer : http://netty.io/wiki/native-transports.html

Change-Id: I0019a084b0f2410f4cea7d5541fe0e5b49699bec
Depends-on: I073093f9a7b28de9890a6842f8bef72d4fdf6872
Signed-off-by: Dileep Ranganathan <dileep.ranganathan@intel.com>
12 files changed:
features/pom.xml
openflow-protocol-impl/pom.xml
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/SwitchConnectionProviderImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpConnectionInitializer.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/UdpChannelInitializer.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/UdpHandler.java
openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java
openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/connection/SwitchConnectionProviderImplTest.java
openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/connection/UdpHandlerTest.java
simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/SimpleClientInitializer.java
simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/UdpSimpleClientInitializer.java

index 62dc813d3700164dc36a724ed1197b74babedeaa..f35b4d8acd5bcf9bfb308a77e251f5857c17e9ac 100644 (file)
       <groupId>io.netty</groupId>
       <artifactId>netty-transport</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport-native-epoll</artifactId>
+      <!-- Explicitly bring in the linux classifier, test may fail on 32-bit linux -->
+      <classifier>linux-x86_64</classifier>
+    </dependency>
   </dependencies>
 
 </project>
index a125a9310a238ba25d001ce37156b72217e72da9..0c3fc442bff374c121123a4fa8d3b13b82d6f8b2 100644 (file)
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>config-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-epoll</artifactId>
+            <!-- Explicitly bring in the linux classifier, test may fail on 32-bit linux -->
+            <classifier>linux-x86_64</classifier>
+        </dependency>
     </dependencies>
 </project>
index aa79e6a361b48384fda80819dcf4fb402a376879..411a9b4323e30dad162749c8c277799cf7dee777 100644 (file)
@@ -11,7 +11,8 @@ package org.opendaylight.openflowjava.protocol.impl.core;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
 import org.opendaylight.openflowjava.protocol.api.extensibility.DeserializerRegistry;
@@ -134,18 +135,24 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
         factory.setDeserializationFactory(deserializationFactory);
         factory.setUseBarrier(connConfig.useBarrier());
         final TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
+
+        // Check if Epoll native transport is available.
+        // TODO : Add option to disable Epoll.
+        boolean isEpollEnabled = Epoll.isAvailable();
+
         if (transportProtocol.equals(TransportProtocol.TCP) || transportProtocol.equals(TransportProtocol.TLS)) {
             server = new TcpHandler(connConfig.getAddress(), connConfig.getPort());
             final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer();
             ((TcpHandler) server).setChannelInitializer(channelInitializer);
-            ((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration());
+            ((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
 
-            final NioEventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup();
-            connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler);
+            final EventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup();
+            connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, isEpollEnabled);
             connectionInitializer.setChannelInitializer(channelInitializer);
             connectionInitializer.run();
         } else if (transportProtocol.equals(TransportProtocol.UDP)){
             server = new UdpHandler(connConfig.getAddress(), connConfig.getPort());
+            ((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
             ((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer());
         } else {
             throw new IllegalStateException("Unknown transport protocol received: " + transportProtocol);
index c4b0937e4c1068bd80aaadc0499bf90423f54aa4..c5905d60800f0908297a21442a6016a2d2230a22 100644 (file)
@@ -2,9 +2,8 @@ package org.opendaylight.openflowjava.protocol.impl.core;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
-
 import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,21 +27,28 @@ public class TcpConnectionInitializer implements ServerFacade,
 
     private TcpChannelInitializer channelInitializer;
     private Bootstrap b;
+    private boolean isEpollEnabled;
 
     /**
      * Constructor
      * @param workerGroup - shared worker group
      */
-    public TcpConnectionInitializer(NioEventLoopGroup workerGroup) {
+    public TcpConnectionInitializer(EventLoopGroup workerGroup, boolean isEpollEnabled) {
         Preconditions.checkNotNull(workerGroup, "WorkerGroup can't be null");
         this.workerGroup = workerGroup;
+        this.isEpollEnabled = isEpollEnabled;
     }
 
     @Override
     public void run() {
         b = new Bootstrap();
-        b.group(workerGroup).channel(NioSocketChannel.class)
-            .handler(channelInitializer);
+        if(isEpollEnabled) {
+            b.group(workerGroup).channel(EpollSocketChannel.class)
+                    .handler(channelInitializer);
+        } else {
+            b.group(workerGroup).channel(NioSocketChannel.class)
+                    .handler(channelInitializer);
+        }
     }
 
     @Override
index ff4bc6a1a27eb2d5f15367180e592ab2db256d46..00a3fd71ffde83727681c96326523f3c28495208 100644 (file)
@@ -13,11 +13,17 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
@@ -50,13 +56,15 @@ public class TcpHandler implements ServerFacade {
     private int port;
     private String address;
     private final InetAddress startupAddress;
-    private NioEventLoopGroup workerGroup;
-    private NioEventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+    private EventLoopGroup bossGroup;
     private final SettableFuture<Boolean> isOnlineFuture;
     private ThreadConfiguration threadConfig;
 
     private TcpChannelInitializer channelInitializer;
 
+    private Class<? extends ServerSocketChannel> socketChannelClass;
+
     /**
      * Constructor of TCPHandler that listens on selected port.
      *
@@ -90,13 +98,13 @@ public class TcpHandler implements ServerFacade {
          * Any other setting means netty will measure the time it spent selecting
          * and spend roughly proportional time executing tasks.
          */
-        workerGroup.setIoRatio(100);
+        //workerGroup.setIoRatio(100);
 
         final ChannelFuture f;
         try {
             ServerBootstrap b = new ServerBootstrap();
             b.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
+                    .channel(socketChannelClass)
                     .handler(new LoggingHandler(LogLevel.DEBUG))
                     .childHandler(channelInitializer)
                     .option(ChannelOption.SO_BACKLOG, 128)
@@ -202,7 +210,21 @@ public class TcpHandler implements ServerFacade {
      * Initiate event loop groups
      * @param threadConfiguration number of threads to be created, if not specified in threadConfig
      */
-    public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration) {
+    public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration, boolean isEpollEnabled) {
+
+        if(isEpollEnabled) {
+            initiateEpollEventLoopGroups(threadConfiguration);
+        } else {
+            initiateNioEventLoopGroups(threadConfiguration);
+        }
+    }
+
+    /**
+     * Initiate Nio event loop groups
+     * @param threadConfiguration number of threads to be created, if not specified in threadConfig
+     */
+    public void initiateNioEventLoopGroups(ThreadConfiguration threadConfiguration) {
+        socketChannelClass = NioServerSocketChannel.class;
         if (threadConfiguration != null) {
             bossGroup = new NioEventLoopGroup(threadConfiguration.getBossThreadCount());
             workerGroup = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
@@ -210,12 +232,37 @@ public class TcpHandler implements ServerFacade {
             bossGroup = new NioEventLoopGroup();
             workerGroup = new NioEventLoopGroup();
         }
+        ((NioEventLoopGroup)workerGroup).setIoRatio(100);
+    }
+
+    /**
+     * Initiate Epoll event loop groups with Nio as fall back
+     * @param threadConfiguration
+     */
+    protected void initiateEpollEventLoopGroups(ThreadConfiguration threadConfiguration) {
+        try {
+            socketChannelClass = EpollServerSocketChannel.class;
+            if (threadConfiguration != null) {
+                    bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
+                workerGroup = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
+            } else {
+                bossGroup = new EpollEventLoopGroup();
+                workerGroup = new EpollEventLoopGroup();
+            }
+            ((EpollEventLoopGroup)workerGroup).setIoRatio(100);
+            return;
+        } catch (Throwable ex) {
+            LOGGER.debug("Epoll initiation failed");
+        }
+
+        //Fallback mechanism
+        initiateNioEventLoopGroups(threadConfiguration);
     }
 
     /**
      * @return workerGroup
      */
-    public NioEventLoopGroup getWorkerGroup() {
+    public EventLoopGroup getWorkerGroup() {
         return workerGroup;
     }
 
index ba1650e53565d6628957444ace04d5b6e230a572..ccb8b06c7c95e8dc1fba73a15770135f5724477a 100644 (file)
@@ -8,16 +8,16 @@
 
 package org.opendaylight.openflowjava.protocol.impl.core;
 
-import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.DatagramChannel;
 
 /**
  * @author michal.polkorab
  *
  */
-public class UdpChannelInitializer extends ProtocolChannelInitializer<NioDatagramChannel> {
+public class UdpChannelInitializer extends ProtocolChannelInitializer<DatagramChannel> {
 
     @Override
-    protected void initChannel(NioDatagramChannel ch) throws Exception {
+    protected void initChannel(DatagramChannel ch) throws Exception {
         ch.pipeline().addLast(PipelineHandlers.OF_DATAGRAMPACKET_HANDLER.name(),
                 new OFDatagramPacketHandler(getSwitchConnectionHandler()));
         OFDatagramPacketDecoder ofDatagramPacketDecoder = new OFDatagramPacketDecoder();
index 3e6e384e66c83a408e52152f1b569a4a8b731a09..9339ba1664f7781a914f33f5ed0becb84edf562c 100644 (file)
@@ -12,7 +12,10 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollDatagramChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramChannel;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
@@ -41,6 +44,7 @@ public final class UdpHandler implements ServerFacade {
     private final SettableFuture<Boolean> isOnlineFuture;
     private UdpChannelInitializer channelInitializer;
     private ThreadConfiguration threadConfig;
+    private Class<? extends DatagramChannel> datagramChannelClass;
 
     /**
      * Constructor of UdpHandler that listens on selected port.
@@ -64,16 +68,11 @@ public final class UdpHandler implements ServerFacade {
 
     @Override
     public void run() {
-        if (threadConfig != null) {
-            group = new NioEventLoopGroup(threadConfig.getWorkerThreadCount());
-        } else {
-            group = new NioEventLoopGroup();
-        }
         final ChannelFuture f;
         try {
             Bootstrap b = new Bootstrap();
             b.group(group)
-             .channel(NioDatagramChannel.class)
+             .channel(datagramChannelClass)
              .option(ChannelOption.SO_BROADCAST, false)
              .handler(channelInitializer);
 
@@ -146,4 +145,51 @@ public final class UdpHandler implements ServerFacade {
     public void setThreadConfig(ThreadConfiguration threadConfig) {
         this.threadConfig = threadConfig;
     }
+
+    /**
+     * Initiate event loop groups
+     * @param threadConfiguration number of threads to be created, if not specified in threadConfig
+     */
+    public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration, boolean isEpollEnabled) {
+
+        if(isEpollEnabled) {
+            initiateEpollEventLoopGroups(threadConfiguration);
+        } else {
+            initiateNioEventLoopGroups(threadConfiguration);
+        }
+    }
+
+    /**
+     * Initiate Nio event loop groups
+     * @param threadConfiguration number of threads to be created, if not specified in threadConfig
+     */
+    public void initiateNioEventLoopGroups(ThreadConfiguration threadConfiguration) {
+        datagramChannelClass = NioDatagramChannel.class;
+        if (threadConfiguration != null) {
+            group = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
+        } else {
+            group = new NioEventLoopGroup();
+        }
+    }
+
+    /**
+     * Initiate Epoll event loop groups with Nio as fall back
+     * @param threadConfiguration
+     */
+    protected void initiateEpollEventLoopGroups(ThreadConfiguration threadConfiguration) {
+        try {
+            datagramChannelClass = EpollDatagramChannel.class;
+            if (threadConfiguration != null) {
+                group = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
+            } else {
+                group = new EpollEventLoopGroup();
+            }
+            return;
+        } catch (Throwable ex) {
+            LOGGER.debug("Epoll initiation failed");
+        }
+
+        //Fallback mechanism
+        initiateNioEventLoopGroups(threadConfiguration);
+    }
 }
\ No newline at end of file
index 6d103355904315729bafec4ac775f3f2b85f4c95..6faad908f01cf60d798a67d6e263376561a0d583 100644 (file)
@@ -18,6 +18,7 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.concurrent.ExecutionException;
 
+import io.netty.channel.unix.Errors;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -62,7 +63,25 @@ public class TcpHandlerTest {
         tcpHandler = new TcpHandler(null, 0);
         tcpHandler.setChannelInitializer(mockChannelInitializer);
 
-        assertEquals("failed to start server", true, startupServer()) ;
+        assertEquals("failed to start server", true, startupServer(false)) ;
+        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort())) ;
+        shutdownServer();
+    }
+
+    /**
+     * Test run with null address set on Epoll native transport
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ExecutionException
+     */
+    @Test
+    public void testRunWithNullAddressOnEpoll() throws IOException, InterruptedException, ExecutionException  {
+
+        tcpHandler = new TcpHandler(null, 0);
+        tcpHandler.setChannelInitializer(mockChannelInitializer);
+
+        //Use Epoll native transport
+        assertEquals("failed to start server", true, startupServer(true)) ;
         assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort())) ;
         shutdownServer();
     }
@@ -79,11 +98,29 @@ public class TcpHandlerTest {
         tcpHandler = new TcpHandler(serverAddress, 0);
         tcpHandler.setChannelInitializer(mockChannelInitializer);
 
-        assertEquals("failed to start server", true, startupServer()) ;
+        assertEquals("failed to start server", true, startupServer(false)) ;
         assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort())) ;
         shutdownServer();
     }
 
+    /**
+     * Test run with address set on Epoll native transport
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ExecutionException
+     */
+    @Test
+    public void testRunWithAddressOnEpoll() throws IOException, InterruptedException, ExecutionException  {
+
+        tcpHandler = new TcpHandler(serverAddress, 0);
+        tcpHandler.setChannelInitializer(mockChannelInitializer);
+
+        //Use Epoll native transport
+        assertEquals("failed to start server", true, startupServer(true));
+        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort()));
+        shutdownServer();
+    }
+
     /**
      * Test run with encryption
      * @throws InterruptedException
@@ -91,17 +128,40 @@ public class TcpHandlerTest {
      * @throws ExecutionException
      */
     @Test
-    public void testRunWithEncryption () throws InterruptedException, IOException, ExecutionException {
+    public void testRunWithEncryption() throws InterruptedException, IOException, ExecutionException {
         int serverPort = 28001;
         tcpHandler = new TcpHandler(serverAddress, serverPort);
         tcpHandler.setChannelInitializer(mockChannelInitializer);
 
-        assertEquals( "failed to start server", true, startupServer()) ;
-        assertEquals( "wrong connection count", 0, tcpHandler.getNumberOfConnections() );
-        assertEquals( "wrong port", serverPort, tcpHandler.getPort() );
-        assertEquals( "wrong address", serverAddress.getHostAddress(), tcpHandler.getAddress()) ;
+        assertEquals( "failed to start server", true, startupServer(false));
+        assertEquals( "wrong connection count", 0, tcpHandler.getNumberOfConnections());
+        assertEquals( "wrong port", serverPort, tcpHandler.getPort());
+        assertEquals( "wrong address", serverAddress.getHostAddress(), tcpHandler.getAddress());
 
-        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort())) ;
+        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort()));
+
+        shutdownServer();
+    }
+
+    /**
+     * Test run with encryption on Epoll native transport
+     * @throws InterruptedException
+     * @throws IOException
+     * @throws ExecutionException
+     */
+    @Test
+    public void testRunWithEncryptionOnEpoll() throws InterruptedException, IOException, ExecutionException {
+        int serverPort = 28001;
+        tcpHandler = new TcpHandler(serverAddress, serverPort);
+        tcpHandler.setChannelInitializer(mockChannelInitializer);
+
+        //Use Epoll native transport
+        assertEquals( "failed to start server", true, startupServer(true));
+        assertEquals( "wrong connection count", 0, tcpHandler.getNumberOfConnections());
+        assertEquals( "wrong port", serverPort, tcpHandler.getPort());
+        assertEquals( "wrong address", serverAddress.getHostAddress(), tcpHandler.getAddress());
+
+        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort()));
 
         shutdownServer();
     }
@@ -123,7 +183,7 @@ public class TcpHandlerTest {
         try {
             tcpHandler = new TcpHandler(serverAddress, serverPort);
             tcpHandler.setChannelInitializer(mockChannelInitializer);
-            tcpHandler.initiateEventLoopGroups(null);
+            tcpHandler.initiateEventLoopGroups(null, false);
             tcpHandler.run();
         } catch (Exception e) {
             if (e instanceof BindException) {
@@ -134,6 +194,35 @@ public class TcpHandlerTest {
         Assert.assertTrue("Expected BindException has not been thrown", exceptionThrown == true);
     }
 
+    /**
+     * Test run on already used port
+     * @throws IOException
+     */
+    @Test
+    public void testSocketAlreadyInUseOnEpoll() throws IOException {
+        int serverPort = 28001;
+        Socket firstBinder = new Socket();
+        boolean exceptionThrown = false;
+        try {
+            firstBinder.bind(new InetSocketAddress(serverAddress, serverPort));
+        } catch (Exception e) {
+            Assert.fail("Test precondition failed - not able to bind socket to port " + serverPort);
+        }
+        try {
+            tcpHandler = new TcpHandler(serverAddress, serverPort);
+            tcpHandler.setChannelInitializer(mockChannelInitializer);
+            //Use Epoll native transport
+            tcpHandler.initiateEventLoopGroups(null, true);
+            tcpHandler.run();
+        } catch (Exception e) {
+            if (e instanceof BindException || e instanceof Errors.NativeIoException) {
+                exceptionThrown = true;
+            }
+        }
+        firstBinder.close();
+        Assert.assertTrue("Expected BindException has not been thrown", exceptionThrown == true);
+    }
+
     /**
      * Trigger the server shutdown and wait 2 seconds for completion
      */
@@ -149,9 +238,13 @@ public class TcpHandlerTest {
      * @throws IOException
      * @throws ExecutionException
      */
-    private Boolean startupServer() throws InterruptedException, IOException, ExecutionException {
+    private Boolean startupServer(boolean isEpollEnabled) throws InterruptedException, IOException, ExecutionException {
         ListenableFuture<Boolean> online = tcpHandler.getIsOnlineFuture();
-        tcpHandler.initiateEventLoopGroups(null);
+        /**
+         * Test EPoll based native transport if isEpollEnabled is true.
+         * Else use Nio based transport.
+         */
+        tcpHandler.initiateEventLoopGroups(null, isEpollEnabled);
             (new Thread(tcpHandler)).start();
             int retry = 0;
             while (online.isDone() != true && retry++ < 20) {
index 491e18de5237718df257af844d20ae4248540296..175595157e054c5422ad3745626b05bf98737d4f 100644 (file)
@@ -161,7 +161,7 @@ public class SwitchConnectionProviderImplTest {
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             Assert.fail();
         }
-    }
+        }
 
     /**
      * Tests correct provider shutdown
index 0dd9ce3b03776c57a931d2e05d21096b011bff95..ff36181f6d98d8f15e1417361803be9dc2f8a719 100644 (file)
@@ -47,7 +47,27 @@ public class UdpHandlerTest {
     public void testWithEmptyAddress() throws InterruptedException, ExecutionException, IOException {
         udpHandler = new UdpHandler(null, 0);
         udpHandler.setChannelInitializer(udpChannelInitializerMock);
-        Assert.assertTrue("Wrong - start server", startupServer());
+        Assert.assertTrue("Wrong - start server", startupServer(false));
+        try {
+            Assert.assertTrue(udpHandler.getIsOnlineFuture().get(1500,TimeUnit.MILLISECONDS).booleanValue());
+        } catch (TimeoutException e) {
+            Assert.fail("Wrong - getIsOnlineFuture timed out");
+        }
+        Assert.assertFalse("Wrong - port has been set to zero", udpHandler.getPort() == 0);
+        shutdownServer();
+    }
+
+    /**
+     * Test to create UdpHandler with empty address and zero port on Epoll native transport
+     * @throws InterruptedException
+     * @throws ExecutionException
+     * @throws IOException
+     */
+    @Test
+    public void testWithEmptyAddressOnEpoll() throws InterruptedException, ExecutionException, IOException {
+        udpHandler = new UdpHandler(null, 0);
+        udpHandler.setChannelInitializer(udpChannelInitializerMock);
+        Assert.assertTrue("Wrong - start server", startupServer(true));
         try {
             Assert.assertTrue(udpHandler.getIsOnlineFuture().get(1500,TimeUnit.MILLISECONDS).booleanValue());
         } catch (TimeoutException e) {
@@ -68,7 +88,7 @@ public class UdpHandlerTest {
         int port = 9874;
         udpHandler = new UdpHandler(InetAddress.getLocalHost(), port);
         udpHandler.setChannelInitializer(udpChannelInitializerMock);
-        Assert.assertTrue("Wrong - start server", startupServer());
+        Assert.assertTrue("Wrong - start server", startupServer(false));
         try {
             Assert.assertTrue(udpHandler.getIsOnlineFuture().get(1500,TimeUnit.MILLISECONDS).booleanValue());
         } catch (TimeoutException e) {
@@ -78,9 +98,34 @@ public class UdpHandlerTest {
         shutdownServer();
     }
 
-    private Boolean startupServer() throws InterruptedException, IOException, ExecutionException {
-        ListenableFuture<Boolean> online = udpHandler.getIsOnlineFuture();
+    /**
+     * Test to create UdpHandler with fill address and given port on Epoll native transport
+     * @throws InterruptedException
+     * @throws ExecutionException
+     * @throws IOException
+     */
+    @Test
+    public void testWithAddressAndPortOnEpoll() throws InterruptedException, ExecutionException, IOException{
+        int port = 9874;
+        udpHandler = new UdpHandler(InetAddress.getLocalHost(), port);
+        udpHandler.setChannelInitializer(udpChannelInitializerMock);
+        Assert.assertTrue("Wrong - start server", startupServer(true));
+        try {
+            Assert.assertTrue(udpHandler.getIsOnlineFuture().get(1500,TimeUnit.MILLISECONDS).booleanValue());
+        } catch (TimeoutException e) {
+            Assert.fail("Wrong - getIsOnlineFuture timed out");
+        }
+        Assert.assertEquals("Wrong - bad port number has been set", port, udpHandler.getPort());
+        shutdownServer();
+    }
 
+    private Boolean startupServer(boolean isEpollEnabled) throws InterruptedException, IOException, ExecutionException {
+        ListenableFuture<Boolean> online = udpHandler.getIsOnlineFuture();
+        /**
+         * Test EPoll based native transport if isEpollEnabled is true.
+         * Else use Nio based transport.
+         */
+        udpHandler.initiateEventLoopGroups(null, isEpollEnabled);
             (new Thread(udpHandler)).start();
             int retry = 0;
             while (online.isDone() != true && retry++ < 20) {
index a2fe3a117ae359de4a6b7652d17a65af5600fe98..98a6e1ff428f7b38795f120b912342fe7b371ea8 100644 (file)
@@ -11,7 +11,7 @@ package org.opendaylight.openflowjava.protocol.impl.clients;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslHandler;
 
 import javax.net.ssl.SSLEngine;
@@ -22,7 +22,7 @@ import com.google.common.util.concurrent.SettableFuture;
  *
  * @author michal.polkorab
  */
-public class SimpleClientInitializer extends ChannelInitializer<NioSocketChannel> {
+public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {
 
     private SettableFuture<Boolean> isOnlineFuture;
     private boolean secured;
@@ -38,7 +38,7 @@ public class SimpleClientInitializer extends ChannelInitializer<NioSocketChannel
     }
 
     @Override
-    public void initChannel(NioSocketChannel ch) throws Exception {
+    public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();
         if (secured) {
             SSLEngine engine = ClientSslContextFactory.getClientContext()
index 11444427f0712026cafcc5eefad03904cddc7c00..a68b6ab795bd42d73ce15b95fe365d6095ebb57e 100644 (file)
@@ -11,7 +11,7 @@ package org.opendaylight.openflowjava.protocol.impl.clients;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.DatagramChannel;
 
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -19,7 +19,7 @@ import com.google.common.util.concurrent.SettableFuture;
  *
  * @author michal.polkorab
  */
-public class UdpSimpleClientInitializer extends ChannelInitializer<NioDatagramChannel> {
+public class UdpSimpleClientInitializer extends ChannelInitializer<DatagramChannel> {
 
     private SettableFuture<Boolean> isOnlineFuture;
     private ScenarioHandler scenarioHandler;
@@ -32,7 +32,7 @@ public class UdpSimpleClientInitializer extends ChannelInitializer<NioDatagramCh
     }
 
     @Override
-    public void initChannel(NioDatagramChannel ch) throws Exception {
+    public void initChannel(DatagramChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();
         SimpleClientHandler simpleClientHandler = new SimpleClientHandler(isOnlineFuture, scenarioHandler);
         simpleClientHandler.setScenario(scenarioHandler);