BUG-4780: New peers failed to connect after one gets connected 10/31410/7
authorClaudio D. Gasparini <cgaspari@cisco.com>
Wed, 16 Dec 2015 08:55:16 +0000 (09:55 +0100)
committerMilos Fabian <milfabia@cisco.com>
Thu, 17 Dec 2015 13:36:40 +0000 (13:36 +0000)
After one peer gets connected, new peer failed to connect.
Cause of the issue was that we were reusing DefaultPromise
from previous connection.
Fix by creating new promise for each connection.
Remove needless EventExecutor variables, use
GlobalEventExecutor.INSTANCE instead.

Change-Id: I2938b9f16087d5b004fdbd43bcf082e26bb8a071
Signed-off-by: Claudio D. Gasparini <cgaspari@cisco.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPProtocolSessionPromise.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPReconnectPromise.java

index 3fcb966ed3a7153a45c643eba4c1fd5199411f17..6583d0fb944c941631392e869de48f6be40a2326 100644 (file)
@@ -21,7 +21,6 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
@@ -57,7 +56,6 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
     private final BGPHandlerFactory handlerFactory;
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
-    private final EventExecutor executor;
     private Optional<KeyMapping> keys;
 
     public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
@@ -67,7 +65,6 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
     public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final MD5ChannelFactory<?> cf, final MD5ServerChannelFactory<?> scf) {
         this.bossGroup = Preconditions.checkNotNull(bossGroup);
         this.workerGroup = Preconditions.checkNotNull(workerGroup);
-        this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE);
         this.handlerFactory = new BGPHandlerFactory(messageRegistry);
         this.channelFactory = cf;
         this.serverChannelFactory = scf;
@@ -80,8 +77,8 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf);
 
         final Bootstrap bootstrap = createClientBootStrap();
-        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
-        bootstrap.handler(BGPChannel.createChannelInitializer(initializer, sessionPromise));
+        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, strategy, bootstrap);
+        bootstrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
         sessionPromise.connect();
         LOG.debug("Client created.");
         return sessionPromise;
@@ -122,7 +119,7 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(peerRegistry);
         this.keys = keys;
         final Bootstrap bootstrap = createClientBootStrap();
-        final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE, address,
+        final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, address,
             connectStrategyFactory, bootstrap, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf));
         reconnectPromise.connect();
         this.keys = Optional.absent();
@@ -141,7 +138,9 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
 
     private ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
         final ServerBootstrap serverBootstrap = new ServerBootstrap();
-        serverBootstrap.childHandler(BGPChannel.createChannelInitializer(initializer, new DefaultPromise(BGPDispatcherImpl.this.executor)));
+        final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer);
+        serverBootstrap.childHandler(serverChannelHandler);
+
         serverBootstrap.option(ChannelOption.SO_BACKLOG, Integer.valueOf(SOCKET_BACKLOG_SIZE));
         serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
@@ -184,7 +183,7 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
             };
         }
 
-        public static <S extends BGPSession> ChannelHandler createChannelInitializer(final ChannelPipelineInitializer initializer, final Promise<S> promise) {
+        public static <S extends BGPSession> ChannelHandler createClientChannelHandler(final ChannelPipelineInitializer initializer, final Promise<S> promise) {
             return new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(final SocketChannel channel) {
@@ -192,5 +191,14 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
                 }
             };
         }
+
+        public static ChannelHandler createServerChannelHandler(final ChannelPipelineInitializer initializer) {
+            return new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(final SocketChannel channel) {
+                    initializer.initializeChannel(channel, new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
+                }
+            };
+        }
     }
 }
index 8a19b897a35287d1f3f3fa3597f9fae6d044aa99..b017962eeaa9237ced8d02352d66ceb88a1d2846 100644 (file)
@@ -13,9 +13,9 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelOption;
 import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import javax.annotation.concurrent.GuardedBy;
@@ -33,8 +33,8 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
     @GuardedBy("this")
     private Future<?> pending;
 
-    public BGPProtocolSessionPromise(EventExecutor executor, InetSocketAddress address, ReconnectStrategy strategy, Bootstrap bootstrap) {
-        super(executor);
+    public BGPProtocolSessionPromise(InetSocketAddress address, ReconnectStrategy strategy, Bootstrap bootstrap) {
+        super(GlobalEventExecutor.INSTANCE);
         this.strategy = Preconditions.checkNotNull(strategy);
         this.address = Preconditions.checkNotNull(address);
         this.bootstrap = Preconditions.checkNotNull(bootstrap);
index 1c84f2b3a9107541229aae7ef302ed940ca32200..bc250687cab9c5d86a7af37ed158b7317f9ed175 100644 (file)
@@ -34,14 +34,12 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     private final ReconnectStrategyFactory strategyFactory;
     private final Bootstrap bootstrap;
     private final ChannelPipelineInitializer initializer;
-    private final EventExecutor executor;
     private Future<S> pending;
 
     public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
                                final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap bootstrap,
                                final ChannelPipelineInitializer initializer) {
         super(executor);
-        this.executor = executor;
         this.bootstrap = bootstrap;
         this.initializer = Preconditions.checkNotNull(initializer);
         this.address = Preconditions.checkNotNull(address);
@@ -76,7 +74,7 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
 
     public Future<S> connectSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap,
                                   final ChannelPipelineInitializer initializer) {
-        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
+        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, strategy, bootstrap);
         final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(final SocketChannel channel) {