X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fcommons%2Fprotocol-framework%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fframework%2FAbstractDispatcher.java;h=80e30dc90c61e914dcc32a089197eff16f2886ee;hp=a62bd7da06501b355c41f36230dbf30a982d61fa;hb=6ca44d2095f0887508dd32f0174058a627eff4f9;hpb=b3d0ded2590e6a5a61055010f7b24e9a943c8d31 diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java index a62bd7da06..80e30dc90c 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java @@ -8,11 +8,10 @@ package org.opendaylight.protocol.framework; import com.google.common.base.Preconditions; - import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -27,11 +26,9 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; - import java.io.Closeable; import java.net.InetSocketAddress; import java.net.SocketAddress; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +36,7 @@ import org.slf4j.LoggerFactory; * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the * start method that will handle sockets in different thread. */ +@Deprecated public abstract class AbstractDispatcher, L extends SessionListener> implements Closeable { @@ -105,7 +103,7 @@ public abstract class AbstractDispatcher, L extends @Override protected void initChannel(final CH ch) { - initializer.initializeChannel(ch, new DefaultPromise(executor)); + initializer.initializeChannel(ch, new DefaultPromise<>(executor)); } }); @@ -113,6 +111,7 @@ public abstract class AbstractDispatcher, L extends if (LocalServerChannel.class.equals(channelClass) == false) { // makes no sense for LocalServer and produces warning b.childOption(ChannelOption.SO_KEEPALIVE, true); + b.childOption(ChannelOption.TCP_NODELAY , true); } b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); customizeBootstrap(b); @@ -122,7 +121,7 @@ public abstract class AbstractDispatcher, L extends } try { b.channel(channelClass); - } catch (IllegalStateException e) { + } catch (final IllegalStateException e) { // FIXME: if this is ok, document why LOG.trace("Not overriding channelFactory on bootstrap {}", b, e); } @@ -148,14 +147,14 @@ public abstract class AbstractDispatcher, L extends * Creates a client. * * @param address remote address - * @param connectStrategy Reconnection strategy to be used when initial connection fails + * @param strategy Reconnection strategy to be used when initial connection fails * * @return Future representing the connection process. Its result represents the combined success of TCP connection * as well as session negotiation. */ protected Future createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer initializer) { final Bootstrap b = new Bootstrap(); - final ProtocolSessionPromise p = new ProtocolSessionPromise(executor, address, strategy, b); + final ProtocolSessionPromise p = new ProtocolSessionPromise<>(executor, address, strategy, b); b.option(ChannelOption.SO_KEEPALIVE, true).handler( new ChannelInitializer() { @Override @@ -165,18 +164,36 @@ public abstract class AbstractDispatcher, L extends }); customizeBootstrap(b); + setWorkerGroup(b); + setChannelFactory(b); + p.connect(); + LOG.debug("Client created."); + return p; + } + + private void setWorkerGroup(final Bootstrap b) { if (b.group() == null) { b.group(workerGroup); } + } - // There is no way to detect if this was already set by - // customizeBootstrap() - try { - b.channel(NioSocketChannel.class); - } catch (IllegalStateException e) { - LOG.trace("Not overriding channelFactory on bootstrap {}", b, e); - } + /** + * Create a client but use a pre-configured bootstrap. + * This method however replaces the ChannelInitializer in the bootstrap. All other configuration is preserved. + * + * @param address remote address + */ + protected Future createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap, final PipelineInitializer initializer) { + final ProtocolSessionPromise p = new ProtocolSessionPromise<>(executor, address, strategy, bootstrap); + + bootstrap.handler( + new ChannelInitializer() { + @Override + protected void initChannel(final SocketChannel ch) { + initializer.initializeChannel(ch, p); + } + }); p.connect(); LOG.debug("Client created."); @@ -195,6 +212,9 @@ public abstract class AbstractDispatcher, L extends } /** + * + * @deprecated use {@link org.opendaylight.protocol.framework.AbstractDispatcher#createReconnectingClient(java.net.InetSocketAddress, ReconnectStrategyFactory, org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer)} with only one reconnectStrategyFactory instead. + * * Creates a client. * * @param address remote address @@ -204,26 +224,52 @@ public abstract class AbstractDispatcher, L extends * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g. * success if it indicates no further attempts should be made and failure if it reports an error */ + @Deprecated protected Future createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy, final PipelineInitializer initializer) { + return createReconnectingClient(address, connectStrategyFactory, initializer); + } - final ReconnectPromise p = new ReconnectPromise(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, reestablishStrategy, initializer); - p.connect(); + /** + * Creates a reconnecting client. + * + * @param address remote address + * @param connectStrategyFactory Factory for creating reconnection strategy for every reconnect attempt + * + * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g. + * success is never reported, only failure when it runs out of reconnection attempts. + */ + protected Future createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory, + final PipelineInitializer initializer) { + final Bootstrap b = new Bootstrap(); + + final ReconnectPromise p = new ReconnectPromise<>(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, b, initializer); + + b.option(ChannelOption.SO_KEEPALIVE, true); + customizeBootstrap(b); + setWorkerGroup(b); + setChannelFactory(b); + + p.connect(); return p; } + private void setChannelFactory(final Bootstrap b) { + // There is no way to detect if this was already set by + // customizeBootstrap() + try { + b.channel(NioSocketChannel.class); + } catch (final IllegalStateException e) { + LOG.trace("Not overriding channelFactory on bootstrap {}", b, e); + } + } + /** - * @deprecated Should only be used with {@link AbstractDispatcher#AbstractDispatcher()} + * @deprecated Should only be used with AbstractDispatcher#AbstractDispatcher() */ @Deprecated @Override public void close() { - try { - this.workerGroup.shutdownGracefully(); - } finally { - this.bossGroup.shutdownGracefully(); - } } - }