BUG-54 : switched channel pipeline to be protocol specific.
[bgpcep.git] / framework / src / main / java / org / opendaylight / protocol / framework / ProtocolSessionPromise.java
index b32859b018bfe394b1bdfb4fd22942d771e5fb97..abc253ef78d014851f051485ae9278bc1619fd18 100644 (file)
@@ -11,8 +11,6 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -29,9 +27,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 
 @ThreadSafe
-final class ProtocolSessionPromise<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends DefaultPromise<S> {
+final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
        private static final Logger logger = LoggerFactory.getLogger(ProtocolSessionPromise.class);
-       private final ChannelInitializerImpl<M, S, L> init;
        private final ReconnectStrategy strategy;
        private final InetSocketAddress address;
        private final Bootstrap b;
@@ -39,27 +36,22 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
        @GuardedBy("this")
        private Future<?> pending;
 
-       ProtocolSessionPromise(final EventLoopGroup workerGroup, final InetSocketAddress address, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
-                       final SessionListenerFactory<L> listenerFactory,
-                       final ProtocolHandlerFactory<?> protocolFactory, final ReconnectStrategy strategy) {
+       ProtocolSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap b) {
                this.strategy = Preconditions.checkNotNull(strategy);
                this.address = Preconditions.checkNotNull(address);
-
-               init = new ChannelInitializerImpl<M, S, L>(negotiatorFactory, listenerFactory, protocolFactory, this);
-               b = new Bootstrap();
-               b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(init);
+               this.b = Preconditions.checkNotNull(b);
        }
 
        synchronized void connect() {
                final Object lock = this;
 
                try {
-                       final int timeout = strategy.getConnectTimeout();
+                       final int timeout = this.strategy.getConnectTimeout();
 
                        logger.debug("Promise {} attempting connect for {}ms", lock, timeout);
 
-                       b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
-                       pending = b.connect(address).addListener(new ChannelFutureListener() {
+                       this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
+                       this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() {
                                @Override
                                public void operationComplete(final ChannelFuture cf) throws Exception {
                                        synchronized (lock) {
@@ -67,7 +59,7 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
                                                logger.debug("Promise {} connection resolved", lock);
 
                                                // Triggered when a connection attempt is resolved.
-                                               Preconditions.checkState(pending == cf);
+                                               Preconditions.checkState(ProtocolSessionPromise.this.pending == cf);
 
                                                /*
                                                 * The promise we gave out could have been cancelled,
@@ -87,13 +79,13 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
                                                }
 
                                                if (!cf.isSuccess()) {
-                                                       final Future<Void> rf = strategy.scheduleReconnect(cf.cause());
+                                                       final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
                                                        rf.addListener(new FutureListener<Void>() {
                                                                @Override
                                                                public void operationComplete(final Future<Void> sf) {
                                                                        synchronized (lock) {
                                                                                // Triggered when a connection attempt is to be made.
-                                                                               Preconditions.checkState(pending == sf);
+                                                                               Preconditions.checkState(ProtocolSessionPromise.this.pending == sf);
 
                                                                                /*
                                                                                 * The promise we gave out could have been cancelled,
@@ -114,14 +106,14 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
                                                                }
                                                        });
 
-                                                       pending = rf;
+                                                       ProtocolSessionPromise.this.pending = rf;
                                                } else {
                                                        logger.debug("Promise {} connection successful", lock);
                                                }
                                        }
                                }
                        });
-               } catch (Exception e) {
+               } catch (final Exception e) {
                        setFailure(e);
                }
        }
@@ -129,7 +121,7 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
        @Override
        public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
                if (super.cancel(mayInterruptIfRunning)) {
-                       pending.cancel(mayInterruptIfRunning);
+                       this.pending.cancel(mayInterruptIfRunning);
                        return true;
                }
 
@@ -139,7 +131,7 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
        @Override
        public synchronized Promise<S> setSuccess(final S result) {
                logger.debug("Promise {} completed", this);
-               strategy.reconnectSuccessful();
+               this.strategy.reconnectSuccessful();
                return super.setSuccess(result);
        }
-}
\ No newline at end of file
+}