BUG-54 : switched channel pipeline to be protocol specific.
[bgpcep.git] / framework / src / main / java / org / opendaylight / protocol / framework / AbstractDispatcher.java
index 2179671fde1d6b320311007abffcd50be367ff59..98021be1d77077b0bc60b5104479e993ea7cbcce 100644 (file)
@@ -7,15 +7,20 @@
  */
 package org.opendaylight.protocol.framework;
 
+import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
 
 import java.io.Closeable;
 import java.net.InetSocketAddress;
@@ -23,13 +28,13 @@ import java.net.InetSocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
  * start method that will handle sockets in different thread.
  */
-public abstract class AbstractDispatcher implements Closeable {
+public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
 
        private static final Logger logger = LoggerFactory.getLogger(AbstractDispatcher.class);
 
@@ -43,25 +48,35 @@ public abstract class AbstractDispatcher implements Closeable {
                this.workerGroup = new NioEventLoopGroup();
        }
 
+       /**
+        * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
+        * method needs to be implemented in protocol specific Dispatchers.
+        * 
+        * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
+        * @param promise to be passed to {@link SessionNegotiatorFactory}
+        */
+       public abstract void initializeChannel(SocketChannel channel, Promise<S> promise, final SessionListenerFactory<L> lfactory);
+
        /**
         * Creates server. Each server needs factories to pass their instances to client sessions.
         * 
         * @param address address to which the server should be bound
-        * @param listenerFactory factory for creating protocol listeners, passed to the negotiator
-        * @param negotiatorFactory protocol session negotiator factory
-        * @param messageFactory message parser
         * 
         * @return ChannelFuture representing the binding process
         */
-       protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> ChannelFuture createServer(
-                       final InetSocketAddress address, final SessionListenerFactory<L> listenerFactory,
-                       final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolMessageFactory<M> messageFactory) {
+       @VisibleForTesting
+       public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<L> lfactory) {
                final ServerBootstrap b = new ServerBootstrap();
                b.group(this.bossGroup, this.workerGroup);
                b.channel(NioServerSocketChannel.class);
                b.option(ChannelOption.SO_BACKLOG, 128);
-               b.childHandler(new ChannelInitializerImpl<M, S, L>(negotiatorFactory,
-                               listenerFactory, new ProtocolHandlerFactory<M>(messageFactory), new DefaultPromise<S>(GlobalEventExecutor.INSTANCE)));
+               b.childHandler(new ChannelInitializer<SocketChannel>() {
+
+                       @Override
+                       protected void initChannel(final SocketChannel ch) throws Exception {
+                               initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE), lfactory);
+                       }
+               });
                b.childOption(ChannelOption.SO_KEEPALIVE, true);
 
                // Bind and start to accept incoming connections.
@@ -75,30 +90,24 @@ public abstract class AbstractDispatcher implements Closeable {
         * Creates a client.
         * 
         * @param address remote address
-        * @param listener session listener
-        * @param negotiatorFactory session negotiator factory
-        * @param messageFactory message parser
         * @param connectStrategy Reconnection strategy to be used when initial connection fails
         * 
-        * @return Future representing the connection process. Its result represents
-        *         the combined success of TCP connection as well as session negotiation.
+        * @return Future representing the connection process. Its result represents the combined success of TCP connection
+        *         as well as session negotiation.
         */
-       protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<S> createClient(
-                       final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
-                       final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategy strategy) {
-               final ProtocolSessionPromise<M, S, L> p = new ProtocolSessionPromise<M, S, L>(workerGroup, address, negotiatorFactory,
-                               new SessionListenerFactory<L>() {
-                       private boolean created = false;
-
-                       @Override
-                       public synchronized L getSessionListener() {
-                               Preconditions.checkState(created == false);
-                               created = true;
-                               return listener;
-                       }
-
-               }, new ProtocolHandlerFactory<M>(messageFactory), strategy);
-
+       @VisibleForTesting
+       public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
+                       final SessionListenerFactory<L> lfactory) {
+               final Bootstrap b = new Bootstrap();
+               final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(address, strategy, b);
+               b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
+                               new ChannelInitializer<SocketChannel>() {
+
+                                       @Override
+                                       protected void initChannel(final SocketChannel ch) throws Exception {
+                                               initializeChannel(ch, p, lfactory);
+                                       }
+                               });
                p.connect();
                logger.debug("Client created.");
                return p;
@@ -108,25 +117,16 @@ public abstract class AbstractDispatcher implements Closeable {
         * Creates a client.
         * 
         * @param address remote address
-        * @param listener session listener
-        * @param negotiatorFactory session negotiator factory
-        * @param messageFactory message parser
         * @param connectStrategyFactory Factory for creating reconnection strategy to be used when initial connection fails
         * @param reestablishStrategy Reconnection strategy to be used when the already-established session fails
         * 
-        * @return Future representing the reconnection task. It will report
-        *         completion based on reestablishStrategy, e.g. success if
-        *         it indicates no further attempts should be made and failure
-        *         if it reports an error
+        * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
+        *         success if it indicates no further attempts should be made and failure if it reports an error
         */
-       protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<Void> createReconnectingClient(
-                       final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
-                       final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategyFactory connectStrategyFactory,
-                       final ReconnectStrategy reestablishStrategy) {
-
-               final ReconnectPromise<M, S, L> p = new ReconnectPromise<M, S, L>(this, address, listener, negotiatorFactory,
-                               messageFactory, connectStrategyFactory, reestablishStrategy);
+       protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
+                       final ReconnectStrategy reestablishStrategy, final SessionListenerFactory<L> lfactory) {
 
+               final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, lfactory);
                p.connect();
 
                return p;