Merge "Fixed for bug : 1171 - issue while creating subnet"
[controller.git] / opendaylight / commons / protocol-framework / src / main / java / org / opendaylight / protocol / framework / AbstractDispatcher.java
index 916ef9a88befa87ac8b5c17902ec6d970f23807d..a62bd7da06501b355c41f36230dbf30a982d61fa 100644 (file)
@@ -7,12 +7,18 @@
  */
 package org.opendaylight.protocol.framework;
 
+import com.google.common.base.Preconditions;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.local.LocalServerChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
@@ -24,19 +30,19 @@ import io.netty.util.concurrent.Promise;
 
 import java.io.Closeable;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
  * start method that will handle sockets in different thread.
  */
 public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
 
-    protected interface PipelineInitializer<S extends ProtocolSession<?>> {
+
+    protected interface ChannelPipelineInitializer<CH extends Channel, S extends ProtocolSession<?>> {
         /**
          * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
          * method needs to be implemented in protocol specific Dispatchers.
@@ -44,7 +50,11 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
          * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
          * @param promise to be passed to {@link SessionNegotiatorFactory}
          */
-        void initializeChannel(SocketChannel channel, Promise<S> promise);
+        void initializeChannel(CH channel, Promise<S> promise);
+    }
+
+    protected interface PipelineInitializer<S extends ProtocolSession<?>> extends ChannelPipelineInitializer<SocketChannel, S> {
+
     }
 
 
@@ -76,25 +86,44 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
      * @return ChannelFuture representing the binding process
      */
     protected ChannelFuture createServer(final InetSocketAddress address, final PipelineInitializer<S> initializer) {
+        return createServer(address, NioServerSocketChannel.class, initializer);
+    }
+
+    /**
+     * Creates server. Each server needs factories to pass their instances to client sessions.
+     *
+     * @param address address to which the server should be bound
+     * @param channelClass The {@link Class} which is used to create {@link Channel} instances from.
+     * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
+     *
+     * @return ChannelFuture representing the binding process
+     */
+    protected <CH extends Channel> ChannelFuture createServer(final SocketAddress address, final Class<? extends ServerChannel> channelClass,
+            final ChannelPipelineInitializer<CH, S> initializer) {
         final ServerBootstrap b = new ServerBootstrap();
-        b.childHandler(new ChannelInitializer<SocketChannel>() {
+        b.childHandler(new ChannelInitializer<CH>() {
 
             @Override
-            protected void initChannel(final SocketChannel ch) {
+            protected void initChannel(final CH ch) {
                 initializer.initializeChannel(ch, new DefaultPromise<S>(executor));
             }
         });
 
         b.option(ChannelOption.SO_BACKLOG, 128);
-        b.childOption(ChannelOption.SO_KEEPALIVE, true);
+        if (LocalServerChannel.class.equals(channelClass) == false) {
+            // makes no sense for LocalServer and produces warning
+            b.childOption(ChannelOption.SO_KEEPALIVE, true);
+        }
+        b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         customizeBootstrap(b);
 
         if (b.group() == null) {
             b.group(bossGroup, workerGroup);
         }
         try {
-            b.channel(NioServerSocketChannel.class);
+            b.channel(channelClass);
         } catch (IllegalStateException e) {
+            // FIXME: if this is ok, document why
             LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
         }
 
@@ -127,9 +156,8 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
     protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
         final Bootstrap b = new Bootstrap();
         final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(executor, address, strategy, b);
-        b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
+        b.option(ChannelOption.SO_KEEPALIVE, true).handler(
                 new ChannelInitializer<SocketChannel>() {
-
                     @Override
                     protected void initChannel(final SocketChannel ch) {
                         initializer.initializeChannel(ch, p);
@@ -138,6 +166,18 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
 
         customizeBootstrap(b);
 
+        if (b.group() == null) {
+            b.group(workerGroup);
+        }
+
+        // There is no way to detect if this was already set by
+        // customizeBootstrap()
+        try {
+            b.channel(NioSocketChannel.class);
+        } catch (IllegalStateException e) {
+            LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+        }
+
         p.connect();
         LOG.debug("Client created.");
         return p;