*/
package org.opendaylight.protocol.framework;
+import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
import java.io.Closeable;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
/**
* Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
* start method that will handle sockets in different thread.
*/
-public abstract class AbstractDispatcher implements Closeable {
+public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(AbstractDispatcher.class);
this.workerGroup = new NioEventLoopGroup();
}
+ /**
+ * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
+ * method needs to be implemented in protocol specific Dispatchers.
+ *
+ * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
+ * @param promise to be passed to {@link SessionNegotiatorFactory}
+ */
+ public abstract void initializeChannel(SocketChannel channel, Promise<S> promise, final SessionListenerFactory<L> lfactory);
+
/**
* Creates server. Each server needs factories to pass their instances to client sessions.
*
* @param address address to which the server should be bound
- * @param listenerFactory factory for creating protocol listeners, passed to the negotiator
- * @param negotiatorFactory protocol session negotiator factory
- * @param messageFactory message parser
*
* @return ChannelFuture representing the binding process
*/
- protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> ChannelFuture createServer(
- final InetSocketAddress address, final SessionListenerFactory<L> listenerFactory,
- final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolMessageFactory<M> messageFactory) {
+ @VisibleForTesting
+ public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<L> lfactory) {
final ServerBootstrap b = new ServerBootstrap();
b.group(this.bossGroup, this.workerGroup);
b.channel(NioServerSocketChannel.class);
b.option(ChannelOption.SO_BACKLOG, 128);
- b.childHandler(new ChannelInitializerImpl<M, S, L>(negotiatorFactory,
- listenerFactory, new ProtocolHandlerFactory<M>(messageFactory), new DefaultPromise<S>(GlobalEventExecutor.INSTANCE)));
+ b.childHandler(new ChannelInitializer<SocketChannel>() {
+
+ @Override
+ protected void initChannel(final SocketChannel ch) throws Exception {
+ initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE), lfactory);
+ }
+ });
b.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
* Creates a client.
*
* @param address remote address
- * @param listener session listener
- * @param negotiatorFactory session negotiator factory
- * @param messageFactory message parser
* @param connectStrategy Reconnection strategy to be used when initial connection fails
*
- * @return Future representing the connection process. Its result represents
- * the combined success of TCP connection as well as session negotiation.
+ * @return Future representing the connection process. Its result represents the combined success of TCP connection
+ * as well as session negotiation.
*/
- protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<S> createClient(
- final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
- final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategy strategy) {
- final ProtocolSessionPromise<M, S, L> p = new ProtocolSessionPromise<M, S, L>(workerGroup, address, negotiatorFactory,
- new SessionListenerFactory<L>() {
- private boolean created = false;
-
- @Override
- public synchronized L getSessionListener() {
- Preconditions.checkState(created == false);
- created = true;
- return listener;
- }
-
- }, new ProtocolHandlerFactory<M>(messageFactory), strategy);
-
+ @VisibleForTesting
+ public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
+ final SessionListenerFactory<L> lfactory) {
+ final Bootstrap b = new Bootstrap();
+ final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(address, strategy, b);
+ b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
+ new ChannelInitializer<SocketChannel>() {
+
+ @Override
+ protected void initChannel(final SocketChannel ch) throws Exception {
+ initializeChannel(ch, p, lfactory);
+ }
+ });
p.connect();
logger.debug("Client created.");
return p;
* Creates a client.
*
* @param address remote address
- * @param listener session listener
- * @param negotiatorFactory session negotiator factory
- * @param messageFactory message parser
* @param connectStrategyFactory Factory for creating reconnection strategy to be used when initial connection fails
* @param reestablishStrategy Reconnection strategy to be used when the already-established session fails
*
- * @return Future representing the reconnection task. It will report
- * completion based on reestablishStrategy, e.g. success if
- * it indicates no further attempts should be made and failure
- * if it reports an error
+ * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
+ * success if it indicates no further attempts should be made and failure if it reports an error
*/
- protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<Void> createReconnectingClient(
- final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
- final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategyFactory connectStrategyFactory,
- final ReconnectStrategy reestablishStrategy) {
-
- final ReconnectPromise<M, S, L> p = new ReconnectPromise<M, S, L>(this, address, listener, negotiatorFactory,
- messageFactory, connectStrategyFactory, reestablishStrategy);
+ protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
+ final ReconnectStrategy reestablishStrategy, final SessionListenerFactory<L> lfactory) {
+ final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, lfactory);
p.connect();
return p;