Fix promises not being associated with an executor 98/3498/1
authorRobert Varga <rovarga@cisco.com>
Thu, 5 Dec 2013 11:11:20 +0000 (12:11 +0100)
committerRobert Varga <rovarga@cisco.com>
Thu, 5 Dec 2013 11:11:20 +0000 (12:11 +0100)
Change-Id: I1b458a2e6539fb0226892b19c2a47bfbdc2d679c
Signed-off-by: Robert Varga <rovarga@cisco.com>
bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/BGPSpeakerMock.java
framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java
framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java
framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
pcep/testtool/src/test/java/org/opendaylight/protocol/pcep/testtool/PCCMock.java

index 8f7a63cea2f1bc9b551608fe3a1db2ba9c5fb7e2..243bde1c802c2a3ae87527cd3165f6d3b8a7fa9a 100644 (file)
@@ -40,7 +40,7 @@ public class BGPSpeakerMock<M, S extends ProtocolSession<M>, L extends SessionLi
 
        public BGPSpeakerMock(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final BGPHandlerFactory factory,
                        final DefaultPromise<BGPSessionImpl> defaultPromise) {
-               super(new NioEventLoopGroup(), new NioEventLoopGroup());
+               super(GlobalEventExecutor.INSTANCE, new NioEventLoopGroup(), new NioEventLoopGroup());
                this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
                this.factory = Preconditions.checkNotNull(factory);
        }
index ff135adaa64ead674adadc79d0d30b9faf24baec..e19bdbacfca46670d7c31f77210fed071a7e9acc 100644 (file)
@@ -18,6 +18,7 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
@@ -28,6 +29,8 @@ import java.net.InetSocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
  * start method that will handle sockets in different thread.
@@ -52,6 +55,8 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
 
        private final EventLoopGroup workerGroup;
 
+       private final EventExecutor executor;
+
        /**
         * Internally creates new instances of NioEventLoopGroup, might deplete system resources and result in Too many open files exception.
         *
@@ -59,14 +64,20 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
         */
        @Deprecated
        protected AbstractDispatcher() {
-               this(new NioEventLoopGroup(),new NioEventLoopGroup());
+               this(GlobalEventExecutor.INSTANCE, new NioEventLoopGroup(),new NioEventLoopGroup());
        }
 
        protected AbstractDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
-               this.bossGroup = bossGroup;
-               this.workerGroup = workerGroup;
+               this(GlobalEventExecutor.INSTANCE, bossGroup, workerGroup);
        }
 
+       protected AbstractDispatcher(final EventExecutor executor, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+               this.bossGroup = Preconditions.checkNotNull(bossGroup);
+               this.workerGroup = Preconditions.checkNotNull(workerGroup);
+               this.executor = Preconditions.checkNotNull(executor);
+       }
+
+
        /**
         * Creates server. Each server needs factories to pass their instances to client sessions.
         *
@@ -84,7 +95,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
 
                        @Override
                        protected void initChannel(final SocketChannel ch) {
-                               initializer.initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE));
+                               initializer.initializeChannel(ch, new DefaultPromise<S>(executor));
                        }
                });
                b.childOption(ChannelOption.SO_KEEPALIVE, true);
@@ -107,7 +118,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
         */
        protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
                final Bootstrap b = new Bootstrap();
-               final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(address, strategy, b);
+               final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(executor, address, strategy, b);
                b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
                                new ChannelInitializer<SocketChannel>() {
 
@@ -134,7 +145,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
        protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
                        final ReconnectStrategy reestablishStrategy, final PipelineInitializer<S> initializer) {
 
-               final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, initializer);
+               final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, reestablishStrategy, initializer);
                p.connect();
 
                return p;
index 31db440222e6d712a141d3f0268db726ab3380c7..41898a53f5cbe5f3c1a0e1585dbcbbee23787764 100644 (file)
@@ -12,6 +12,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelOption;
 import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
@@ -36,7 +37,8 @@ final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends Default
        @GuardedBy("this")
        private Future<?> pending;
 
-       ProtocolSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap b) {
+       ProtocolSessionPromise(final EventExecutor executor, final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap b) {
+               super(executor);
                this.strategy = Preconditions.checkNotNull(strategy);
                this.address = Preconditions.checkNotNull(address);
                this.b = Preconditions.checkNotNull(b);
index 193c209900f67b1971a34cf3c202b5b2e8080f61..869dc1afc140a0ecad8c32d00d9e81994b829cbd 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.protocol.framework;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
@@ -32,10 +33,10 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
 
        private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
 
-       public ReconnectPromise(final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
+       public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
                        final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
                        final PipelineInitializer<S> initializer) {
-
+               super(executor);
                this.dispatcher = Preconditions.checkNotNull(dispatcher);
                this.address = Preconditions.checkNotNull(address);
                this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
@@ -43,7 +44,7 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
                this.initializer = Preconditions.checkNotNull(initializer);
        }
 
-       // TODO rafactor
+       // FIXME: BUG-190: refactor
 
        synchronized void connect() {
                negotiationFinished.set(false);
index 46ba54771c575d5f0bba1909cdd2979c48614aea..9e6ddf62b73005a3addc54f1aa599e157b6450e9 100644 (file)
@@ -43,7 +43,7 @@ public class PCCMock<M, S extends ProtocolSession<M>, L extends SessionListener<
 
        public PCCMock(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final PCEPHandlerFactory factory,
                        final DefaultPromise<PCEPSessionImpl> defaultPromise) {
-               super(new NioEventLoopGroup(), new NioEventLoopGroup());
+               super(GlobalEventExecutor.INSTANCE, new NioEventLoopGroup(), new NioEventLoopGroup());
                this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
                this.factory = Preconditions.checkNotNull(factory);
        }