X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fcommons%2Fprotocol-framework%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fframework%2FAbstractDispatcher.java;h=7b8816bd1b38843f2b0440c2bc565beaa13144fa;hp=fef2c7196948c007705b4444a5c9445065618648;hb=ea3673e89598b896c93ebee864e6cb8db7f6c6ec;hpb=8115c4f148589ee00ec6aebc13eaf10f9975c83b diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java index fef2c71969..7b8816bd1b 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java @@ -7,9 +7,18 @@ */ package org.opendaylight.protocol.framework; +import java.io.Closeable; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Preconditions; + import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; @@ -25,16 +34,12 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; -import java.io.Closeable; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the * start method that will handle sockets in different thread. */ +@Deprecated public abstract class AbstractDispatcher, L extends SessionListener> implements Closeable { @@ -94,8 +99,8 @@ public abstract class AbstractDispatcher, L extends * * @return ChannelFuture representing the binding process */ - protected ChannelFuture createServer(SocketAddress address, Class channelClass, - final ChannelPipelineInitializer initializer) { + protected ChannelFuture createServer(final SocketAddress address, final Class channelClass, + final ChannelPipelineInitializer initializer) { final ServerBootstrap b = new ServerBootstrap(); b.childHandler(new ChannelInitializer() { @@ -110,6 +115,7 @@ public abstract class AbstractDispatcher, L extends // makes no sense for LocalServer and produces warning b.childOption(ChannelOption.SO_KEEPALIVE, true); } + b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); customizeBootstrap(b); if (b.group() == null) { @@ -150,10 +156,9 @@ public abstract class AbstractDispatcher, L extends */ protected Future createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer initializer) { final Bootstrap b = new Bootstrap(); - final ProtocolSessionPromise p = new ProtocolSessionPromise(executor, address, strategy, b); - b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler( + final ProtocolSessionPromise p = new ProtocolSessionPromise<>(executor, address, strategy, b); + b.option(ChannelOption.SO_KEEPALIVE, true).handler( new ChannelInitializer() { - @Override protected void initChannel(final SocketChannel ch) { initializer.initializeChannel(ch, p); @@ -161,6 +166,36 @@ public abstract class AbstractDispatcher, L extends }); customizeBootstrap(b); + setWorkerGroup(b); + setChannelFactory(b); + + p.connect(); + LOG.debug("Client created."); + return p; + } + + private void setWorkerGroup(final Bootstrap b) { + if (b.group() == null) { + b.group(workerGroup); + } + } + + /** + * Create a client but use a pre-configured bootstrap. + * This method however replaces the ChannelInitializer in the bootstrap. All other configuration is preserved. + * + * @param address remote address + */ + protected Future createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap, final PipelineInitializer initializer) { + final ProtocolSessionPromise p = new ProtocolSessionPromise<>(executor, address, strategy, bootstrap); + + bootstrap.handler( + new ChannelInitializer() { + @Override + protected void initChannel(final SocketChannel ch) { + initializer.initializeChannel(ch, p); + } + }); p.connect(); LOG.debug("Client created."); @@ -179,6 +214,9 @@ public abstract class AbstractDispatcher, L extends } /** + * + * @deprecated use {@link org.opendaylight.protocol.framework.AbstractDispatcher#createReconnectingClient(java.net.InetSocketAddress, ReconnectStrategyFactory, org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer)} with only one reconnectStrategyFactory instead. + * * Creates a client. * * @param address remote address @@ -188,15 +226,47 @@ public abstract class AbstractDispatcher, L extends * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g. * success if it indicates no further attempts should be made and failure if it reports an error */ + @Deprecated protected Future createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy, final PipelineInitializer initializer) { + return createReconnectingClient(address, connectStrategyFactory, initializer); + } - final ReconnectPromise p = new ReconnectPromise(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, reestablishStrategy, initializer); - p.connect(); + /** + * Creates a reconnecting client. + * + * @param address remote address + * @param connectStrategyFactory Factory for creating reconnection strategy for every reconnect attempt + * + * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g. + * success is never reported, only failure when it runs out of reconnection attempts. + */ + protected Future createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory, + final PipelineInitializer initializer) { + final Bootstrap b = new Bootstrap(); + + final ReconnectPromise p = new ReconnectPromise<>(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, b, initializer); + + b.option(ChannelOption.SO_KEEPALIVE, true); + + customizeBootstrap(b); + setWorkerGroup(b); + setChannelFactory(b); + p.connect(); return p; } + private void setChannelFactory(final Bootstrap b) { + // There is no way to detect if this was already set by + // customizeBootstrap() + try { + b.channel(NioSocketChannel.class); + } catch (final IllegalStateException e) { + LOG.trace("Not overriding channelFactory on bootstrap {}", b, e); + } + } + /** * @deprecated Should only be used with {@link AbstractDispatcher#AbstractDispatcher()} */ @@ -209,5 +279,4 @@ public abstract class AbstractDispatcher, L extends this.bossGroup.shutdownGracefully(); } } - }