X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fcommons%2Fprotocol-framework%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fframework%2FAbstractDispatcher.java;h=a62bd7da06501b355c41f36230dbf30a982d61fa;hp=916ef9a88befa87ac8b5c17902ec6d970f23807d;hb=c222e37f2a0f0f3f6266242fbea2d3b018f4e6e3;hpb=31b7a44c89d1057489338492fcf62a64147bea24 diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java index 916ef9a88b..a62bd7da06 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java @@ -7,12 +7,18 @@ */ package org.opendaylight.protocol.framework; +import com.google.common.base.Preconditions; + import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.local.LocalServerChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -24,19 +30,19 @@ import io.netty.util.concurrent.Promise; import java.io.Closeable; import java.net.InetSocketAddress; +import java.net.SocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - /** * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the * start method that will handle sockets in different thread. */ public abstract class AbstractDispatcher, L extends SessionListener> implements Closeable { - protected interface PipelineInitializer> { + + protected interface ChannelPipelineInitializer> { /** * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this * method needs to be implemented in protocol specific Dispatchers. @@ -44,7 +50,11 @@ public abstract class AbstractDispatcher, L extends * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory} * @param promise to be passed to {@link SessionNegotiatorFactory} */ - void initializeChannel(SocketChannel channel, Promise promise); + void initializeChannel(CH channel, Promise promise); + } + + protected interface PipelineInitializer> extends ChannelPipelineInitializer { + } @@ -76,25 +86,44 @@ public abstract class AbstractDispatcher, L extends * @return ChannelFuture representing the binding process */ protected ChannelFuture createServer(final InetSocketAddress address, final PipelineInitializer initializer) { + return createServer(address, NioServerSocketChannel.class, initializer); + } + + /** + * Creates server. Each server needs factories to pass their instances to client sessions. + * + * @param address address to which the server should be bound + * @param channelClass The {@link Class} which is used to create {@link Channel} instances from. + * @param initializer instance of PipelineInitializer used to initialize the channel pipeline + * + * @return ChannelFuture representing the binding process + */ + protected ChannelFuture createServer(final SocketAddress address, final Class channelClass, + final ChannelPipelineInitializer initializer) { final ServerBootstrap b = new ServerBootstrap(); - b.childHandler(new ChannelInitializer() { + b.childHandler(new ChannelInitializer() { @Override - protected void initChannel(final SocketChannel ch) { + protected void initChannel(final CH ch) { initializer.initializeChannel(ch, new DefaultPromise(executor)); } }); b.option(ChannelOption.SO_BACKLOG, 128); - b.childOption(ChannelOption.SO_KEEPALIVE, true); + if (LocalServerChannel.class.equals(channelClass) == false) { + // makes no sense for LocalServer and produces warning + b.childOption(ChannelOption.SO_KEEPALIVE, true); + } + b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); customizeBootstrap(b); if (b.group() == null) { b.group(bossGroup, workerGroup); } try { - b.channel(NioServerSocketChannel.class); + b.channel(channelClass); } catch (IllegalStateException e) { + // FIXME: if this is ok, document why LOG.trace("Not overriding channelFactory on bootstrap {}", b, e); } @@ -127,9 +156,8 @@ public abstract class AbstractDispatcher, L extends protected Future createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer initializer) { final Bootstrap b = new Bootstrap(); final ProtocolSessionPromise p = new ProtocolSessionPromise(executor, address, strategy, b); - b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler( + b.option(ChannelOption.SO_KEEPALIVE, true).handler( new ChannelInitializer() { - @Override protected void initChannel(final SocketChannel ch) { initializer.initializeChannel(ch, p); @@ -138,6 +166,18 @@ public abstract class AbstractDispatcher, L extends customizeBootstrap(b); + if (b.group() == null) { + b.group(workerGroup); + } + + // There is no way to detect if this was already set by + // customizeBootstrap() + try { + b.channel(NioSocketChannel.class); + } catch (IllegalStateException e) { + LOG.trace("Not overriding channelFactory on bootstrap {}", b, e); + } + p.connect(); LOG.debug("Client created."); return p;