Fix the dispatcher not being threadsafe 64/1464/4
authorRobert Varga <rovarga@cisco.com>
Thu, 19 Sep 2013 06:45:40 +0000 (08:45 +0200)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 27 Sep 2013 20:36:15 +0000 (20:36 +0000)
It turns out that the createServer/createClient calls actually need a
initializer which is protocol-specific and that listener needs to be
more than an abstract method, as there is protocol-specific state which
needs to be maintained.

Change-Id: Ia70c01183b23f79f7f1d8376cbe04c6af2f549d1
Signed-off-by: Robert Varga <rovarga@cisco.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java
bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/BGPSpeakerMock.java
framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java
framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java
framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java
pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPDispatcher.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPDispatcherImpl.java
pcep/testtool/src/test/java/org/opendaylight/protocol/pcep/testtool/PCCMock.java

index 35081a209646a64173ebc6e0fc9cee33c70c4d93..0878e6037be4812bfdcc2d60ab1272b413471ca3 100644 (file)
@@ -30,8 +30,6 @@ import org.opendaylight.protocol.framework.SessionListenerFactory;
 public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl, BGPSessionListener> implements BGPDispatcher {
        private final Timer timer = new HashedWheelTimer();
 
-       private BGPSessionNegotiatorFactory snf;
-
        private final BGPHandlerFactory hf;
 
        public BGPDispatcherImpl(final BGPMessageFactory parser) {
@@ -42,22 +40,20 @@ public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl,
        @Override
        public Future<? extends BGPSession> createClient(final InetSocketAddress address, final BGPSessionPreferences preferences,
                        final BGPSessionListener listener, final ReconnectStrategy strategy) {
-               this.snf = new BGPSessionNegotiatorFactory(this.timer, preferences);
+               final BGPSessionNegotiatorFactory snf = new BGPSessionNegotiatorFactory(this.timer, preferences);
                final SessionListenerFactory<BGPSessionListener> slf = new SessionListenerFactory<BGPSessionListener>() {
-
                        @Override
                        public BGPSessionListener getSessionListener() {
                                return listener;
                        }
                };
-               return super.createClient(address, strategy, slf);
-       }
-
-       @Override
-       public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise,
-                       final SessionListenerFactory<BGPSessionListener> slf) {
-               ch.pipeline().addLast(this.hf.getDecoders());
-               ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(slf, ch, promise));
-               ch.pipeline().addLast(this.hf.getEncoders());
+               return super.createClient(address, strategy, new PipelineInitializer<BGPSessionImpl>() {
+                       @Override
+                       public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
+                               ch.pipeline().addLast(hf.getDecoders());
+                               ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(slf, ch, promise));
+                               ch.pipeline().addLast(hf.getEncoders());
+                       }
+               });
        }
 }
index 38a324867aeef5e75c02b8a13005520b4a8367a4..e5d11fd7659a8437d1403daf1a4cecaf8a656636 100644 (file)
@@ -47,11 +47,16 @@ AbstractDispatcher<S, L> {
                this.factory = Preconditions.checkNotNull(factory);
        }
 
-       @Override
-       public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> listenerFactory) {
-               ch.pipeline().addLast(this.factory.getDecoders());
-               ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
-               ch.pipeline().addLast(this.factory.getEncoders());
+       public void createServer(final InetSocketAddress address, final SessionListenerFactory<L> listenerFactory) {
+               super.createServer(address, new PipelineInitializer<S>() {
+
+                       @Override
+                       public void initializeChannel(final SocketChannel ch, final Promise<S> promise) {
+                               ch.pipeline().addLast(factory.getDecoders());
+                               ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
+                               ch.pipeline().addLast(factory.getEncoders());
+                       }
+               });
        }
 
        public static void main(final String[] args) throws IOException {
index 98021be1d77077b0bc60b5104479e993ea7cbcce..afea289c007f1dfdb8df9aa90f0ce5f622f5397b 100644 (file)
@@ -28,14 +28,24 @@ import java.net.InetSocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
  * start method that will handle sockets in different thread.
  */
 public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
 
+       protected interface PipelineInitializer<S extends ProtocolSession<?>> {
+               /**
+                * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
+                * method needs to be implemented in protocol specific Dispatchers.
+                * 
+                * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
+                * @param promise to be passed to {@link SessionNegotiatorFactory}
+                */
+               public void initializeChannel(SocketChannel channel, Promise<S> promise);
+       }
+
+
        private static final Logger logger = LoggerFactory.getLogger(AbstractDispatcher.class);
 
        private final EventLoopGroup bossGroup;
@@ -48,24 +58,15 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
                this.workerGroup = new NioEventLoopGroup();
        }
 
-       /**
-        * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
-        * method needs to be implemented in protocol specific Dispatchers.
-        * 
-        * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
-        * @param promise to be passed to {@link SessionNegotiatorFactory}
-        */
-       public abstract void initializeChannel(SocketChannel channel, Promise<S> promise, final SessionListenerFactory<L> lfactory);
-
        /**
         * Creates server. Each server needs factories to pass their instances to client sessions.
         * 
         * @param address address to which the server should be bound
+        * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
         * 
         * @return ChannelFuture representing the binding process
         */
-       @VisibleForTesting
-       public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<L> lfactory) {
+       protected ChannelFuture createServer(final InetSocketAddress address, final PipelineInitializer<S> initializer) {
                final ServerBootstrap b = new ServerBootstrap();
                b.group(this.bossGroup, this.workerGroup);
                b.channel(NioServerSocketChannel.class);
@@ -74,7 +75,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
 
                        @Override
                        protected void initChannel(final SocketChannel ch) throws Exception {
-                               initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE), lfactory);
+                               initializer.initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE));
                        }
                });
                b.childOption(ChannelOption.SO_KEEPALIVE, true);
@@ -95,9 +96,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
         * @return Future representing the connection process. Its result represents the combined success of TCP connection
         *         as well as session negotiation.
         */
-       @VisibleForTesting
-       public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
-                       final SessionListenerFactory<L> lfactory) {
+       protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
                final Bootstrap b = new Bootstrap();
                final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(address, strategy, b);
                b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
@@ -105,7 +104,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
 
                                        @Override
                                        protected void initChannel(final SocketChannel ch) throws Exception {
-                                               initializeChannel(ch, p, lfactory);
+                                               initializer.initializeChannel(ch, p);
                                        }
                                });
                p.connect();
@@ -124,9 +123,9 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
         *         success if it indicates no further attempts should be made and failure if it reports an error
         */
        protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
-                       final ReconnectStrategy reestablishStrategy, final SessionListenerFactory<L> lfactory) {
+                       final ReconnectStrategy reestablishStrategy, final PipelineInitializer<S> initializer) {
 
-               final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, lfactory);
+               final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, initializer);
                p.connect();
 
                return p;
index 508737866a34665d8f7527b579fd52d91ed2ca84..4665f901ded58edbab0d56764e1eae5a17fc67d7 100644 (file)
@@ -13,6 +13,8 @@ import io.netty.util.concurrent.FutureListener;
 
 import java.net.InetSocketAddress;
 
+import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
+
 import com.google.common.base.Preconditions;
 
 final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
@@ -20,18 +22,18 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
        private final InetSocketAddress address;
        private final ReconnectStrategyFactory strategyFactory;
        private final ReconnectStrategy strategy;
+       private final PipelineInitializer<S> initializer;
        private Future<?> pending;
-       private final SessionListenerFactory<L> lfactory;
 
        public ReconnectPromise(final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
                        final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
-                       final SessionListenerFactory<L> lfactory) {
+                       final PipelineInitializer<S> initializer) {
 
                this.dispatcher = Preconditions.checkNotNull(dispatcher);
                this.address = Preconditions.checkNotNull(address);
                this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
                this.strategy = Preconditions.checkNotNull(reestablishStrategy);
-               this.lfactory = Preconditions.checkNotNull(lfactory);
+               this.initializer = Preconditions.checkNotNull(initializer);
        }
 
        synchronized void connect() {
@@ -62,7 +64,7 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
                        }
                };
 
-               final Future<S> cf = this.dispatcher.createClient(this.address, rs, this.lfactory);
+               final Future<S> cf = this.dispatcher.createClient(this.address, rs, this.initializer);
 
                final Object lock = this;
                this.pending = cf;
index 76acac8c911a58c45e3e7dc36ae45a4d536e99cf..e605c135761a52386e3733f7f656502ef6bf829c 100644 (file)
@@ -28,7 +28,7 @@ import org.junit.Test;
 public class ServerTest {
        public static final int PORT = 18080;
 
-       AbstractDispatcher<?, SimpleSessionListener> clientDispatcher, dispatcher;
+       SimpleDispatcher clientDispatcher, dispatcher;
 
        final SimpleSessionListener pce = new SimpleSessionListener();
 
@@ -42,7 +42,7 @@ public class ServerTest {
        public void testConnectionEstablished() throws Exception {
                final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
 
-               this.dispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+               this.dispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
 
                        @Override
                        public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
@@ -61,7 +61,7 @@ public class ServerTest {
 
                this.server.get();
 
-               this.clientDispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+               this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
                        @Override
                        public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
                                        final Channel channel, final Promise<SimpleSession> promise) {
@@ -71,11 +71,11 @@ public class ServerTest {
 
                this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress,
                                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
-                                       @Override
-                                       public SimpleSessionListener getSessionListener() {
-                                               return new SimpleSessionListener();
-                                       }
-                               }).get(6, TimeUnit.SECONDS);
+                       @Override
+                       public SimpleSessionListener getSessionListener() {
+                               return new SimpleSessionListener();
+                       }
+               }).get(6, TimeUnit.SECONDS);
 
                assertEquals(true, p.get(3, TimeUnit.SECONDS));
        }
@@ -84,7 +84,7 @@ public class ServerTest {
        public void testConnectionFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException {
                final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
 
-               this.dispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+               this.dispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
 
                        @Override
                        public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
@@ -103,7 +103,7 @@ public class ServerTest {
 
                this.server.get();
 
-               this.clientDispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+               this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
                        @Override
                        public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
                                        final Channel channel, final Promise<SimpleSession> promise) {
@@ -113,19 +113,19 @@ public class ServerTest {
 
                this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress,
                                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
-                                       @Override
-                                       public SimpleSessionListener getSessionListener() {
-                                               return new SimpleSessionListener();
-                                       }
-                               }).get(6, TimeUnit.SECONDS);
+                       @Override
+                       public SimpleSessionListener getSessionListener() {
+                               return new SimpleSessionListener();
+                       }
+               }).get(6, TimeUnit.SECONDS);
 
                final Future<?> session = this.clientDispatcher.createClient(this.serverAddress,
                                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
-                                       @Override
-                                       public SimpleSessionListener getSessionListener() {
-                                               return new SimpleSessionListener();
-                                       }
-                               });
+                       @Override
+                       public SimpleSessionListener getSessionListener() {
+                               return new SimpleSessionListener();
+                       }
+               });
                assertFalse(session.isSuccess());
        }
 
index dfaf0a2742c60e2aa0ca3b12ee53a2c5c45dcfa4..bbaf5ca4ac25363605a08d2ea4d8e42a77429c38 100644 (file)
@@ -1,32 +1,52 @@
 package org.opendaylight.protocol.framework;
 
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
 
+import java.net.InetSocketAddress;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
-public class SimpleDispatcher<M, S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends
-AbstractDispatcher<S, L> {
+public class SimpleDispatcher extends AbstractDispatcher<SimpleSession, SimpleSessionListener> {
 
        private static final Logger logger = LoggerFactory.getLogger(SimpleDispatcher.class);
 
-       private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
+       private final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory;
        private final ProtocolHandlerFactory<?> factory;
 
-       public SimpleDispatcher(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
-                       final Promise<S> promise) {
+       private final class SimplePipelineInitializer implements PipelineInitializer<SimpleSession> {
+               final SessionListenerFactory<SimpleSessionListener> listenerFactory;
+
+               SimplePipelineInitializer(final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+                       this.listenerFactory = Preconditions.checkNotNull(listenerFactory);
+               }
+
+               @Override
+               public void initializeChannel(final SocketChannel channel, final Promise<SimpleSession> promise) {
+                       channel.pipeline().addLast(factory.getDecoders());
+                       channel.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, channel, promise));
+                       channel.pipeline().addLast(factory.getEncoders());
+                       logger.debug("initialization completed for channel {}", channel);
+               }
+
+       }
+
+       public SimpleDispatcher(final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
+                       final Promise<SimpleSession> promise) {
                this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
                this.factory = Preconditions.checkNotNull(factory);
        }
 
-       @Override
-       public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> lfactory) {
-               ch.pipeline().addLast(this.factory.getDecoders());
-               ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(lfactory, ch, promise));
-               ch.pipeline().addLast(this.factory.getEncoders());
-               logger.debug("initialization completed for channel {}", ch);
+       public Future<SimpleSession> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+               return super.createClient(address, strategy, new SimplePipelineInitializer(listenerFactory));
+       }
+
+       public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+               return super.createServer(address, new SimplePipelineInitializer(listenerFactory));
        }
 }
index 8f2bcf4b2a0296f295dbe8578d3471b7f7dc7fc9..32f3edd6433f692218483337bec6eb20f2d7927e 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.protocol.pcep;
 
 import io.netty.channel.ChannelFuture;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.opendaylight.protocol.framework.SessionListenerFactory;
@@ -24,7 +23,6 @@ public interface PCEPDispatcher {
         * @param address to be bound with the server
         * @param listenerFactory to create listeners for clients
         * @return instance of PCEPServer
-        * @throws IOException if some IO error occurred
         */
        public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<PCEPSessionListener> listenerFactory);
 }
index 52cc2df3c642eb88725037036d4bc23ee14d988b..7740e10a87715b5d27c99624c538dca87aa1c419 100644 (file)
@@ -44,14 +44,13 @@ public class PCEPDispatcherImpl extends AbstractDispatcher<PCEPSessionImpl, PCEP
 
        @Override
        public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<PCEPSessionListener> listenerFactory) {
-               return super.createServer(address, listenerFactory);
-       }
-
-       @Override
-       public void initializeChannel(final SocketChannel ch, final Promise<PCEPSessionImpl> promise,
-                       final SessionListenerFactory<PCEPSessionListener> listenerFactory) {
-               ch.pipeline().addLast(this.hf.getDecoders());
-               ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(listenerFactory, ch, promise));
-               ch.pipeline().addLast(this.hf.getEncoders());
+               return super.createServer(address, new PipelineInitializer<PCEPSessionImpl>() {
+                       @Override
+                       public void initializeChannel(final SocketChannel ch, final Promise<PCEPSessionImpl> promise) {
+                               ch.pipeline().addLast(hf.getDecoders());
+                               ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(listenerFactory, ch, promise));
+                               ch.pipeline().addLast(hf.getEncoders());
+                       }
+               });
        }
 }
index 3f50a5689252b2bd2739388b031346aedb666c9e..d668e571adbcbd1f401ad825cc3c273e54684f6c 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.protocol.pcep.testtool;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 
@@ -20,6 +21,7 @@ import org.opendaylight.protocol.framework.AbstractDispatcher;
 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
 import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
 import org.opendaylight.protocol.framework.ProtocolSession;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.SessionListener;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
@@ -47,11 +49,15 @@ AbstractDispatcher<S, L> {
                this.factory = Preconditions.checkNotNull(factory);
        }
 
-       @Override
-       public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> listenerFactory) {
-               ch.pipeline().addLast(this.factory.getDecoders());
-               ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
-               ch.pipeline().addLast(this.factory.getEncoders());
+       public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final SessionListenerFactory<L> listenerFactory) {
+               return super.createClient(address, strategy, new PipelineInitializer<S>() {
+                       @Override
+                       public void initializeChannel(final SocketChannel ch, final Promise<S> promise) {
+                               ch.pipeline().addLast(factory.getDecoders());
+                               ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
+                               ch.pipeline().addLast(factory.getEncoders());
+                       }
+               });
        }
 
        public static void main(final String[] args) throws Exception {