Fix the dispatcher not being threadsafe
[bgpcep.git] / framework / src / main / java / org / opendaylight / protocol / framework / AbstractDispatcher.java
index 98021be1d77077b0bc60b5104479e993ea7cbcce..afea289c007f1dfdb8df9aa90f0ce5f622f5397b 100644 (file)
@@ -28,14 +28,24 @@ import java.net.InetSocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
  * start method that will handle sockets in different thread.
  */
 public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
 
+       protected interface PipelineInitializer<S extends ProtocolSession<?>> {
+               /**
+                * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
+                * method needs to be implemented in protocol specific Dispatchers.
+                * 
+                * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
+                * @param promise to be passed to {@link SessionNegotiatorFactory}
+                */
+               public void initializeChannel(SocketChannel channel, Promise<S> promise);
+       }
+
+
        private static final Logger logger = LoggerFactory.getLogger(AbstractDispatcher.class);
 
        private final EventLoopGroup bossGroup;
@@ -48,24 +58,15 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
                this.workerGroup = new NioEventLoopGroup();
        }
 
-       /**
-        * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
-        * method needs to be implemented in protocol specific Dispatchers.
-        * 
-        * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
-        * @param promise to be passed to {@link SessionNegotiatorFactory}
-        */
-       public abstract void initializeChannel(SocketChannel channel, Promise<S> promise, final SessionListenerFactory<L> lfactory);
-
        /**
         * Creates server. Each server needs factories to pass their instances to client sessions.
         * 
         * @param address address to which the server should be bound
+        * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
         * 
         * @return ChannelFuture representing the binding process
         */
-       @VisibleForTesting
-       public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<L> lfactory) {
+       protected ChannelFuture createServer(final InetSocketAddress address, final PipelineInitializer<S> initializer) {
                final ServerBootstrap b = new ServerBootstrap();
                b.group(this.bossGroup, this.workerGroup);
                b.channel(NioServerSocketChannel.class);
@@ -74,7 +75,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
 
                        @Override
                        protected void initChannel(final SocketChannel ch) throws Exception {
-                               initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE), lfactory);
+                               initializer.initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE));
                        }
                });
                b.childOption(ChannelOption.SO_KEEPALIVE, true);
@@ -95,9 +96,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
         * @return Future representing the connection process. Its result represents the combined success of TCP connection
         *         as well as session negotiation.
         */
-       @VisibleForTesting
-       public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
-                       final SessionListenerFactory<L> lfactory) {
+       protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
                final Bootstrap b = new Bootstrap();
                final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(address, strategy, b);
                b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
@@ -105,7 +104,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
 
                                        @Override
                                        protected void initChannel(final SocketChannel ch) throws Exception {
-                                               initializeChannel(ch, p, lfactory);
+                                               initializer.initializeChannel(ch, p);
                                        }
                                });
                p.connect();
@@ -124,9 +123,9 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
         *         success if it indicates no further attempts should be made and failure if it reports an error
         */
        protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
-                       final ReconnectStrategy reestablishStrategy, final SessionListenerFactory<L> lfactory) {
+                       final ReconnectStrategy reestablishStrategy, final PipelineInitializer<S> initializer) {
 
-               final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, lfactory);
+               final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, initializer);
                p.connect();
 
                return p;