BUG-7006: Unit-tests sometimes hangs during execution 07/48107/2
authorClaudio D. Gasparini <cgaspari@cisco.com>
Thu, 3 Nov 2016 10:53:33 +0000 (11:53 +0100)
committerClaudio D. Gasparini <cgaspari@cisco.com>
Tue, 8 Nov 2016 12:31:50 +0000 (13:31 +0100)
Fix sporadically hangs during unit-tests execution

Change-Id: Ia339af6d9181a7be8e4b3e6335bba250aaecd822
Signed-off-by: Claudio D. Gasparini <cgaspari@cisco.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractAddPathTest.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPDispatcherTest.java [new file with mode: 0644]
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AddPathAllPathsTest.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AddPathBasePathsTest.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AddPathNPathsTest.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImplTest.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/TestClientDispatcher.java [deleted file]
bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/BGPPeerBuilder.java

index f48173a8506d5a34878ed52e7c04edc7b0179258..d375bc0cdb69f5155dbfd44c42084a87c79290b3 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.protocol.bgp.rib.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import io.netty.bootstrap.Bootstrap;
@@ -69,10 +70,10 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
 
     @Override
     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry listener, final int retryTimer) {
-        return createClient(remoteAddress, listener, retryTimer, createClientBootStrap(Optional.<KeyMapping>absent(), this.workerGroup));
+        return createClient(remoteAddress, listener, retryTimer, createClientBootStrap(Optional.absent(), false));
     }
 
-    private Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry listener, final int retryTimer,
+    private synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry listener, final int retryTimer,
             final Bootstrap clientBootStrap) {
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(listener);
         final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf);
@@ -84,14 +85,15 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         return sessionPromise;
     }
 
-    public Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress, final InetSocketAddress remoteAddress,
-            final BGPPeerRegistry strictBGPPeerRegistry, final int retryTimer) {
-        final Bootstrap clientBootStrap = createClientBootStrap(Optional.<KeyMapping>absent(), this.workerGroup);
+    @VisibleForTesting
+    public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress, final InetSocketAddress remoteAddress,
+            final BGPPeerRegistry strictBGPPeerRegistry, final int retryTimer, final boolean reuseAddress) {
+        final Bootstrap clientBootStrap = createClientBootStrap(Optional.absent(), reuseAddress);
         clientBootStrap.localAddress(localAddress);
         return createClient(remoteAddress, strictBGPPeerRegistry, retryTimer, clientBootStrap);
     }
 
-    protected Bootstrap createClientBootStrap(final Optional<KeyMapping> keys, final EventLoopGroup workerGroup) {
+    private synchronized Bootstrap createClientBootStrap(final Optional<KeyMapping> keys, final boolean reuseAddress) {
         final Bootstrap bootstrap = new Bootstrap();
         if (Epoll.isAvailable()) {
             bootstrap.channel(EpollSocketChannel.class);
@@ -112,16 +114,17 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
         bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
         bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
+        bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
 
         if (bootstrap.group() == null) {
-            bootstrap.group(workerGroup);
+            bootstrap.group(this.workerGroup);
         }
 
         return bootstrap;
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
         if (Epoll.isAvailable()) {
             this.workerGroup.shutdownGracefully().awaitUninterruptibly();
             this.bossGroup.shutdownGracefully().awaitUninterruptibly();
@@ -131,26 +134,32 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
     @Override
     public synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry peerRegistry,
             final int retryTimer, final Optional<KeyMapping> keys) {
+        return createReconnectingClient(remoteAddress, peerRegistry, retryTimer, keys, null, false);
+    }
+
+    @VisibleForTesting
+    protected synchronized Future<Void> createReconnectingClient(final InetSocketAddress remoteAddress, final BGPPeerRegistry peerRegistry,
+        final int retryTimer, final Optional<KeyMapping> keys, final InetSocketAddress localAddress, final boolean reuseAddress) {
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(peerRegistry);
-        final Bootstrap bootstrap = createClientBootStrap(keys, this.workerGroup);
+        final Bootstrap bootstrap = createClientBootStrap(keys, reuseAddress);
+        bootstrap.localAddress(localAddress);
         final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, remoteAddress,
-                retryTimer, bootstrap, peerRegistry, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf));
+            retryTimer, bootstrap, peerRegistry, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf));
         reconnectPromise.connect();
         return reconnectPromise;
     }
 
     @Override
-    public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress localAddress) {
+    public synchronized ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress serverAddress) {
         final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(registry);
         final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf);
-        final ServerBootstrap serverBootstrap = createServerBootstrap(initializer, this.bossGroup, this.workerGroup);
-        final ChannelFuture channelFuture = serverBootstrap.bind(localAddress);
-        LOG.debug("Initiated server {} at {}.", channelFuture, localAddress);
+        final ServerBootstrap serverBootstrap = createServerBootstrap(initializer);
+        final ChannelFuture channelFuture = serverBootstrap.bind(serverAddress);
+        LOG.debug("Initiated server {} at {}.", channelFuture, serverAddress);
         return channelFuture;
     }
 
-    public static ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer,
-            final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+    private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
         final ServerBootstrap serverBootstrap = new ServerBootstrap();
         if (Epoll.isAvailable()) {
             serverBootstrap.channel(EpollServerSocketChannel.class);
@@ -161,7 +170,7 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer);
         serverBootstrap.childHandler(serverChannelHandler);
 
-        serverBootstrap.option(ChannelOption.SO_BACKLOG, Integer.valueOf(SOCKET_BACKLOG_SIZE));
+        serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
         serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
         serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
@@ -170,19 +179,19 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         serverBootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
 
         if (serverBootstrap.group() == null) {
-            serverBootstrap.group(bossGroup, workerGroup);
+            serverBootstrap.group(this.bossGroup, this.workerGroup);
         }
         return serverBootstrap;
     }
 
-    public static final class BGPChannel {
+    private static final class BGPChannel {
         private static final String NEGOTIATOR = "negotiator";
 
         private BGPChannel() {
 
         }
 
-        public static <S extends BGPSession, T extends BGPSessionNegotiatorFactory> ChannelPipelineInitializer
+        static <T extends BGPSessionNegotiatorFactory> ChannelPipelineInitializer
         createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) {
             return (channel, promise) -> {
                 channel.pipeline().addLast(hf.getDecoders());
@@ -191,7 +200,7 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
             };
         }
 
-        public static <S extends BGPSession> ChannelHandler createClientChannelHandler(final ChannelPipelineInitializer initializer, final Promise<S> promise) {
+        static <S extends BGPSession> ChannelHandler createClientChannelHandler(final ChannelPipelineInitializer initializer, final Promise<S> promise) {
             return new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(final SocketChannel channel) {
@@ -200,7 +209,7 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
             };
         }
 
-        public static ChannelHandler createServerChannelHandler(final ChannelPipelineInitializer initializer) {
+        static ChannelHandler createServerChannelHandler(final ChannelPipelineInitializer initializer) {
             return new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(final SocketChannel channel) {
index 313ced90deb32bc1cc34e079bbb95eb58436ab8b..acc73c40f9e5566a7d10cadd8aec915d066c1dd4 100644 (file)
@@ -9,21 +9,16 @@ package org.opendaylight.protocol.bgp.rib.impl;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
+import static org.opendaylight.protocol.bgp.rib.impl.CheckUtil.waitFutureSuccess;
 import static org.opendaylight.protocol.bgp.rib.spi.RouterIds.createPeerId;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.net.InetAddresses;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
 import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -44,6 +39,7 @@ import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.protocol.bgp.inet.RIBActivator;
+import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl;
 import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
 import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderContext;
@@ -81,6 +77,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.mess
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.update.message.WithdrawnRoutesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.Attributes1;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.Attributes1Builder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.BgpTableType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1Builder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.SendReceive;
@@ -95,6 +92,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.mult
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.BgpRib;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.PeerId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.PeerRole;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.rib.Peer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.BgpId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.BgpOrigin;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.ClusterIdentifier;
@@ -112,6 +111,8 @@ import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 class AbstractAddPathTest extends AbstractDataBrokerTest {
+    protected static final List<BgpTableType> TABLES = ImmutableList.of(new BgpTableTypeImpl(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class));
+    protected static final TablesKey TABLE_KEY = new TablesKey(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
     static final String RIB_ID = "127.0.0.1";
     static final Ipv4Address PEER1 = new Ipv4Address("127.0.0.2");
     static final Ipv4Address PEER2 = new Ipv4Address("127.0.0.3");
@@ -199,15 +200,14 @@ class AbstractAddPathTest extends AbstractDataBrokerTest {
         this.bgpActivator.close();
     }
 
-    void sendRouteAndCheckIsOnLocRib(final Channel session, final Ipv4Prefix prefix, final long localPreference, final int expectedRoutesOnDS)
+    void sendRouteAndCheckIsOnLocRib(final BGPSessionImpl session, final Ipv4Prefix prefix, final long localPreference, final int expectedRoutesOnDS)
         throws InterruptedException, ExecutionException {
         session.writeAndFlush(createSimpleUpdate(prefix, null, null, localPreference));
         Thread.sleep(2000);
         checkLocRib(expectedRoutesOnDS);
-
     }
 
-    void sendWithdrawalRouteAndCheckIsOnLocRib(final Channel session, final Ipv4Prefix prefix, final long localPreference, final int expectedRoutesOnDS)
+    void sendWithdrawalRouteAndCheckIsOnLocRib(final BGPSessionImpl session, final Ipv4Prefix prefix, final long localPreference, final int expectedRoutesOnDS)
         throws InterruptedException, ExecutionException {
         session.writeAndFlush(createSimpleWithdrawalUpdate(prefix, localPreference));
         Thread.sleep(2000);
@@ -233,28 +233,14 @@ class AbstractAddPathTest extends AbstractDataBrokerTest {
         Assert.assertEquals(numberOfPeers, bgpRib.getRib().get(0).getPeer().size());
     }
 
-    Channel createPeerSession(final Ipv4Address peer, final PeerRole peerRole, final BgpParameters nonAddPathParams, final RIBImpl ribImpl,
-        final BGPHandlerFactory hf, final SimpleSessionListener sessionListsner) throws InterruptedException, ExecutionException {
+    BGPSessionImpl createPeerSession(final Ipv4Address peer, final PeerRole peerRole, final BgpParameters nonAddPathParams, final RIBImpl ribImpl,
+        final BGPHandlerFactory hf, final SimpleSessionListener sessionListener) throws InterruptedException, ExecutionException {
         configurePeer(peer, ribImpl, nonAddPathParams, peerRole);
-        return connectPeer(peer, nonAddPathParams, this.dispatcher, hf, sessionListsner);
+        return connectPeer(peer, nonAddPathParams, this.dispatcher, hf, sessionListener);
     }
 
-    private static ChannelFuture createClient(final BGPDispatcherImpl dispatcher, final InetSocketAddress remoteAddress,
-        final BGPPeerRegistry registry, final InetSocketAddress localAddress, final BGPHandlerFactory hf) throws InterruptedException {
-        final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(registry);
-
-        final Bootstrap bootstrap = dispatcher.createClientBootStrap(Optional.absent(), Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup());
-        bootstrap.localAddress(localAddress);
-        bootstrap.option(ChannelOption.SO_REUSEADDR, true);
-        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-            @Override
-            protected void initChannel(final SocketChannel ch) throws Exception {
-                ch.pipeline().addLast(hf.getDecoders());
-                ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(ch, new DefaultPromise<>(ch.eventLoop())));
-                ch.pipeline().addLast(hf.getEncoders());
-            }
-        });
-        return bootstrap.connect(remoteAddress).sync();
+    private static int getPeerRibOutSize(final Peer peer) {
+        return ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bgp.rib.rib.peer.adj.rib.out.tables.routes.Ipv4RoutesCase) peer.getAdjRibOut().getTables().get(0).getRoutes()).getIpv4Routes().getIpv4Route().size();
     }
 
     private static void configurePeer(final Ipv4Address localAddress, final RIBImpl ribImpl, final BgpParameters bgpParameters, final PeerRole peerRole) {
@@ -263,21 +249,23 @@ class AbstractAddPathTest extends AbstractDataBrokerTest {
         final BGPPeer bgpPeer = new BGPPeer(inetAddress.getHostAddress(), ribImpl, peerRole, null);
         final List<BgpParameters> tlvs = Lists.newArrayList(bgpParameters);
         StrictBGPPeerRegistry.GLOBAL.addPeer(new IpAddress(new Ipv4Address(inetAddress.getHostAddress())), bgpPeer,
-            new BGPSessionPreferences(AS_NUMBER, HOLDTIMER, new BgpId(RIB_ID),
-                AS_NUMBER, tlvs, Optional.absent()));
+            new BGPSessionPreferences(AS_NUMBER, HOLDTIMER, new BgpId(RIB_ID), AS_NUMBER, tlvs, Optional.absent()));
         bgpPeer.instantiateServiceInstance();
     }
 
-    private static Channel connectPeer(final Ipv4Address localAddress, final BgpParameters bgpParameters,
-        final BGPDispatcherImpl dispatcherImpl, final BGPHandlerFactory hf, final BGPSessionListener sessionListsner) throws InterruptedException {
+    private static BGPSessionImpl connectPeer(final Ipv4Address localAddress, final BgpParameters bgpParameters,
+        final BGPDispatcherImpl dispatcherImpl, final BGPHandlerFactory hf, final BGPSessionListener sessionListener) throws InterruptedException {
         final BGPPeerRegistry peerRegistry = new StrictBGPPeerRegistry();
-        peerRegistry.addPeer(new IpAddress(new Ipv4Address(RIB_ID)), sessionListsner,
+        peerRegistry.addPeer(new IpAddress(new Ipv4Address(RIB_ID)), sessionListener,
             new BGPSessionPreferences(AS_NUMBER, HOLDTIMER, new BgpId(localAddress),
                 AS_NUMBER, Lists.newArrayList(bgpParameters), Optional.absent()));
 
-        final ChannelFuture createClient = createClient(dispatcherImpl, new InetSocketAddress(RIB_ID, PORT), peerRegistry, new InetSocketAddress(localAddress.getValue(), PORT), hf);
+        final Future<BGPSessionImpl> future = dispatcherImpl.createClient(new InetSocketAddress(localAddress.getValue(), PORT),
+            new InetSocketAddress(RIB_ID, PORT), peerRegistry, 2, true);
         Thread.sleep(1000);
-        return createClient.channel();
+        waitFutureSuccess(future);
+        final BGPSessionImpl client = future.getNow();
+        return client;
     }
 
     protected static BgpParameters createParameter(final boolean addPath) {
diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPDispatcherTest.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPDispatcherTest.java
new file mode 100644 (file)
index 0000000..8f3eb0c
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl;
+
+import static org.opendaylight.protocol.bgp.rib.impl.CheckUtil.waitFutureSuccess;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
+import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil;
+import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl;
+import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
+import org.opendaylight.protocol.bgp.parser.spi.pojo.ServiceLoaderBGPExtensionProviderContext;
+import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
+import org.opendaylight.protocol.util.InetSocketAddressUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.BgpParameters;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.BgpParametersBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.bgp.parameters.OptionalCapabilities;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.bgp.parameters.OptionalCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.bgp.parameters.optional.capabilities.CParametersBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.bgp.parameters.optional.capabilities.c.parameters.As4BytesCapabilityBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.BgpTableType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1Builder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.MultiprotocolCapabilityBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.BgpId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv4AddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
+import org.slf4j.LoggerFactory;
+
+public class AbstractBGPDispatcherTest {
+    protected static final AsNumber AS_NUMBER = new AsNumber(30L);
+    protected static final int RETRY_TIMER = 1;
+    protected static final BgpTableType IPV_4_TT = new BgpTableTypeImpl(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
+    private static final short HOLD_TIMER = 30;
+    protected BGPDispatcherImpl clientDispatcher;
+    protected BGPPeerRegistry registry;
+    protected SimpleSessionListener clientListener;
+    protected BGPDispatcherImpl serverDispatcher;
+    protected SimpleSessionListener serverListener;
+    protected InetSocketAddress clientAddress;
+    private EventLoopGroup boss;
+    private EventLoopGroup worker;
+
+    @Before
+    public void setUp() throws BGPDocumentedException {
+        if (!Epoll.isAvailable()) {
+            this.boss = new NioEventLoopGroup();
+            this.worker = new NioEventLoopGroup();
+        }
+        this.registry = new StrictBGPPeerRegistry();
+        this.clientListener = new SimpleSessionListener();
+        this.serverListener = new SimpleSessionListener();
+        final BGPExtensionProviderContext ctx = ServiceLoaderBGPExtensionProviderContext.getSingletonInstance();
+        this.serverDispatcher = new BGPDispatcherImpl(ctx.getMessageRegistry(), this.boss, this.worker);
+
+        this.clientAddress = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress();
+        final IpAddress clientPeerIp = new IpAddress(new Ipv4Address(clientAddress.getAddress().getHostAddress()));
+        this.registry.addPeer(clientPeerIp, this.clientListener, createPreferences(clientAddress));
+        this.clientDispatcher = new BGPDispatcherImpl(ctx.getMessageRegistry(), this.boss, this.worker);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        this.serverDispatcher.close();
+        this.registry.close();
+        if (!Epoll.isAvailable()) {
+            this.worker.shutdownGracefully().awaitUninterruptibly();
+            this.boss.shutdownGracefully().awaitUninterruptibly();
+        }
+    }
+
+    private void configureClient(final BGPExtensionProviderContext ctx) {
+        final InetSocketAddress clientAddress = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress();
+        final IpAddress clientPeerIp = new IpAddress(new Ipv4Address(clientAddress.getAddress().getHostAddress()));
+        this.registry.addPeer(clientPeerIp, this.clientListener, createPreferences(clientAddress));
+        this.clientDispatcher = new BGPDispatcherImpl(ctx.getMessageRegistry(), this.boss, this.worker);
+        if (!Epoll.isAvailable()) {
+            this.worker.shutdownGracefully().awaitUninterruptibly();
+            this.boss.shutdownGracefully().awaitUninterruptibly();
+        }
+    }
+
+    protected BGPSessionPreferences createPreferences(final InetSocketAddress socketAddress) {
+        final List<BgpParameters> tlvs = Lists.newArrayList();
+        final List<OptionalCapabilities> capas = Lists.newArrayList();
+        capas.add(new OptionalCapabilitiesBuilder().setCParameters(new CParametersBuilder().addAugmentation(
+            CParameters1.class, new CParameters1Builder().setMultiprotocolCapability(new MultiprotocolCapabilityBuilder()
+                .setAfi(IPV_4_TT.getAfi()).setSafi(IPV_4_TT.getSafi()).build()).build())
+            .setAs4BytesCapability(new As4BytesCapabilityBuilder().setAsNumber(new AsNumber(30L)).build())
+            .build()).build());
+        capas.add(new OptionalCapabilitiesBuilder().setCParameters(BgpExtendedMessageUtil.EXTENDED_MESSAGE_CAPABILITY).build());
+        tlvs.add(new BgpParametersBuilder().setOptionalCapabilities(capas).build());
+        final BgpId bgpId = new BgpId(new Ipv4Address(socketAddress.getAddress().getHostAddress()));
+        return new BGPSessionPreferences(AS_NUMBER, HOLD_TIMER, bgpId, AS_NUMBER, tlvs, Optional.absent());
+    }
+
+    public static void checkIdleState(final SimpleSessionListener listener) {
+        Stopwatch sw = Stopwatch.createStarted();
+        while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
+            if (BGPSessionImpl.State.IDLE != listener.getState()) {
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            } else {
+                return;
+            }
+        }
+        Assert.fail();
+    }
+
+    protected Channel createServer(final InetSocketAddress serverAddress) throws InterruptedException {
+        this.registry.addPeer(new IpAddress(new Ipv4Address(serverAddress.getAddress().getHostAddress())), this.serverListener, createPreferences(serverAddress));
+        LoggerFactory.getLogger(AbstractBGPDispatcherTest.class).info("createServer");
+        final ChannelFuture future = this.serverDispatcher.createServer(this.registry, serverAddress);
+        future.addListener(future1 -> Preconditions.checkArgument(future1.isSuccess(), "Unable to start bgp server on %s", future1.cause()));
+        waitFutureSuccess(future);
+        return future.channel();
+    }
+}
index 293904b9e95a88833bc72a22218a78e60a77714f..6fb2496f645ff242a3bb6e7e7bb62388cf473ef3 100644 (file)
@@ -11,7 +11,6 @@ import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import io.netty.channel.Channel;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -62,13 +61,13 @@ public class AddPathAllPathsTest extends AbstractAddPathTest {
         final BgpParameters nonAddPathParams = createParameter(false);
         final BgpParameters addPathParams = createParameter(true);
 
-        final Channel session1 = createPeerSession(PEER1, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
-        final Channel session2 = createPeerSession(PEER2, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
-        final Channel session3 = createPeerSession(PEER3, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
+        final BGPSessionImpl session1 = createPeerSession(PEER1, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
+        final BGPSessionImpl session2 = createPeerSession(PEER2, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
+        final BGPSessionImpl session3 = createPeerSession(PEER3, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
         final SimpleSessionListener listener4 = new SimpleSessionListener();
-        final Channel session4 = createPeerSession(PEER4, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener4);
+        final BGPSessionImpl session4 = createPeerSession(PEER4, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener4);
         final SimpleSessionListener listener5 = new SimpleSessionListener();
-        final Channel session5 = createPeerSession(PEER5, PeerRole.RrClient, addPathParams, ribImpl, hf, listener5);
+        final BGPSessionImpl session5 = createPeerSession(PEER5, PeerRole.RrClient, addPathParams, ribImpl, hf, listener5);
         Thread.sleep(1000);
         checkPeersPresentOnDataStore(5);
 
@@ -79,7 +78,7 @@ public class AddPathAllPathsTest extends AbstractAddPathTest {
         assertEquals(UPD_100, listener5.getListMsg().get(0));
 
         final SimpleSessionListener listener6 = new SimpleSessionListener();
-        final Channel session6 = createPeerSession(PEER6, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener6);
+        final BGPSessionImpl session6 = createPeerSession(PEER6, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener6);
         Thread.sleep(1000);
         checkPeersPresentOnDataStore(6);
         assertEquals(1, listener6.getListMsg().size());
index 9b98513912fe995a7280eba3f212c98ecfc69760..365f3f351a1b6f0a9a6009a9487a2f476b546a8b 100644 (file)
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import io.netty.channel.Channel;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -59,13 +58,13 @@ public class AddPathBasePathsTest extends AbstractAddPathTest {
         final BGPHandlerFactory hf = new BGPHandlerFactory(this.context.getMessageRegistry());
         final BgpParameters nonAddPathParams = createParameter(false);
 
-        final Channel session1 = createPeerSession(PEER1, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
-        final Channel session2 = createPeerSession(PEER2, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
-        final Channel session3 = createPeerSession(PEER3, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
+        final BGPSessionImpl session1 = createPeerSession(PEER1, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
+        final BGPSessionImpl session2 = createPeerSession(PEER2, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
+        final BGPSessionImpl session3 = createPeerSession(PEER3, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
         final SimpleSessionListener listener4 = new SimpleSessionListener();
-        final Channel session4 = createPeerSession(PEER4, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener4);
+        final BGPSessionImpl session4 = createPeerSession(PEER4, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener4);
         final SimpleSessionListener listener5 = new SimpleSessionListener();
-        final Channel session5 = createPeerSession(PEER5, PeerRole.Ebgp, nonAddPathParams, ribImpl, hf, listener5);
+        final BGPSessionImpl session5 = createPeerSession(PEER5, PeerRole.Ebgp, nonAddPathParams, ribImpl, hf, listener5);
         Thread.sleep(1000);
         checkPeersPresentOnDataStore(5);
 
@@ -89,7 +88,7 @@ public class AddPathBasePathsTest extends AbstractAddPathTest {
         assertEquals(UPD_NA_200_EBGP, listener5.getListMsg().get(1));
 
         final SimpleSessionListener listener6 = new SimpleSessionListener();
-        final Channel session6 = createPeerSession(PEER6, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener6);
+        final BGPSessionImpl session6 = createPeerSession(PEER6, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener6);
         Thread.sleep(1000);
         checkPeersPresentOnDataStore(6);
         assertEquals(1, listener6.getListMsg().size());
index e154e463e84f7d55615ad0cfe2a3d00e72f3ec5a..ebc44045512135b5454ebae452b87136ae0344ca 100644 (file)
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import io.netty.channel.Channel;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -61,13 +60,13 @@ public class AddPathNPathsTest extends AbstractAddPathTest {
         final BgpParameters nonAddPathParams = createParameter(false);
         final BgpParameters addPathParams = createParameter(true);
 
-        final Channel session1 = createPeerSession(PEER1, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
-        final Channel session2 = createPeerSession(PEER2, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
-        final Channel session3 = createPeerSession(PEER3, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
+        final BGPSessionImpl session1 = createPeerSession(PEER1, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
+        final BGPSessionImpl session2 = createPeerSession(PEER2, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
+        final BGPSessionImpl session3 = createPeerSession(PEER3, PeerRole.Ibgp, nonAddPathParams, ribImpl, hf, new SimpleSessionListener());
         final SimpleSessionListener listener4 = new SimpleSessionListener();
-        final Channel session4 = createPeerSession(PEER4, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener4);
+        final BGPSessionImpl session4 = createPeerSession(PEER4, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener4);
         final SimpleSessionListener listener5 = new SimpleSessionListener();
-        final Channel session5 = createPeerSession(PEER5, PeerRole.RrClient, addPathParams, ribImpl, hf, listener5);
+        final BGPSessionImpl session5 = createPeerSession(PEER5, PeerRole.RrClient, addPathParams, ribImpl, hf, listener5);
         Thread.sleep(1000);
         checkPeersPresentOnDataStore(5);
 
@@ -78,7 +77,7 @@ public class AddPathNPathsTest extends AbstractAddPathTest {
         assertEquals(UPD_100, listener5.getListMsg().get(0));
 
         final SimpleSessionListener listener6 = new SimpleSessionListener();
-        final Channel session6 = createPeerSession(PEER6, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener6);
+        final BGPSessionImpl session6 = createPeerSession(PEER6, PeerRole.RrClient, nonAddPathParams, ribImpl, hf, listener6);
         Thread.sleep(1000);
         checkPeersPresentOnDataStore(6);
         assertEquals(1, listener6.getListMsg().size());
index 0d7c0b07d547db9492d98e10ab26343948c4e152..1cd8282275986d3c91fb04b28d328b719b35e151 100755 (executable)
@@ -26,7 +26,6 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
@@ -65,12 +64,13 @@ public class BGPDispatcherImplTest {
     private static final int RETRY_TIMER = 1;
     private static final BgpTableType IPV_4_TT = new BgpTableTypeImpl(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
     private BGPDispatcherImpl serverDispatcher;
-    private TestClientDispatcher clientDispatcher;
+    private BGPDispatcherImpl clientDispatcher;
     private BGPPeerRegistry registry;
     private SimpleSessionListener clientListener;
     private SimpleSessionListener serverListener;
     private EventLoopGroup boss;
     private EventLoopGroup worker;
+    private InetSocketAddress clientAddress;
 
     @Before
     public void setUp() throws BGPDocumentedException {
@@ -88,13 +88,6 @@ public class BGPDispatcherImplTest {
         configureClient(ctx);
     }
 
-    static <T extends Future> void waitFutureSuccess(final T future) throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(1);
-        future.addListener(future1 -> latch.countDown());
-        Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
-    }
-
-
     public static void checkIdleState (final SimpleSessionListener listener){
         Stopwatch sw = Stopwatch.createStarted();
         while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
@@ -108,10 +101,10 @@ public class BGPDispatcherImplTest {
     }
 
     private void configureClient(final BGPExtensionProviderContext ctx) {
-        final InetSocketAddress clientAddress = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress();
-        final IpAddress clientPeerIp = new IpAddress(new Ipv4Address(clientAddress.getAddress().getHostAddress()));
-        this.registry.addPeer(clientPeerIp, this.clientListener, createPreferences(clientAddress));
-        this.clientDispatcher = new TestClientDispatcher(this.boss, this.worker, ctx.getMessageRegistry(), clientAddress);
+        this.clientAddress = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress();
+        final IpAddress clientPeerIp = new IpAddress(new Ipv4Address(this.clientAddress.getAddress().getHostAddress()));
+        this.registry.addPeer(clientPeerIp, this.clientListener, createPreferences(this.clientAddress));
+        this.clientDispatcher = new BGPDispatcherImpl(ctx.getMessageRegistry(), this.boss, this.worker);
     }
 
     private Channel createServer(final InetSocketAddress serverAddress) throws InterruptedException {
@@ -142,7 +135,7 @@ public class BGPDispatcherImplTest {
         final InetSocketAddress serverAddress = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress();
         final Channel serverChannel = createServer(serverAddress);
         Thread.sleep(1000);
-        final Future<BGPSessionImpl> futureClient = this.clientDispatcher.createClient(serverAddress, this.registry, 2, Optional.absent());
+        final Future<BGPSessionImpl> futureClient = this.clientDispatcher.createClient(this.clientAddress, serverAddress, this.registry, 2, true);
         waitFutureSuccess(futureClient);
         final BGPSessionImpl session = futureClient.get();
         Assert.assertEquals(BGPSessionImpl.State.UP, this.clientListener.getState());
@@ -159,7 +152,8 @@ public class BGPDispatcherImplTest {
     @Test
     public void testCreateReconnectingClient() throws Exception {
         final InetSocketAddress serverAddress = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress();
-        final Future<Void> future = this.clientDispatcher.createReconnectingClient(serverAddress, this.registry, RETRY_TIMER, Optional.absent());
+        final Future<Void> future = this.clientDispatcher.createReconnectingClient(serverAddress, this.registry, RETRY_TIMER, Optional.absent(),
+            this.clientAddress, true);
         waitFutureSuccess(future);
         final Channel serverChannel = createServer(serverAddress);
         Assert.assertEquals(BGPSessionImpl.State.UP, this.serverListener.getState());
diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/TestClientDispatcher.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/TestClientDispatcher.java
deleted file mode 100755 (executable)
index 715fd6d..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.protocol.bgp.rib.impl;
-
-import com.google.common.base.Optional;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.Future;
-import java.net.InetSocketAddress;
-import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry;
-import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
-import org.opendaylight.protocol.concepts.KeyMapping;
-
-public class TestClientDispatcher {
-
-    private final BGPHandlerFactory hf;
-    private final InetSocketAddress defaultAddress;
-    private InetSocketAddress localAddress;
-    private final BGPDispatcherImpl disp;
-
-    protected TestClientDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final MessageRegistry messageRegistry,
-            final InetSocketAddress localAddress) {
-        this.disp = new BGPDispatcherImpl(messageRegistry, bossGroup, workerGroup) {
-            @Override
-            protected Bootstrap createClientBootStrap(final Optional<KeyMapping> keys, final EventLoopGroup workerGroup) {
-                final Bootstrap bootstrap = new Bootstrap();
-                if (Epoll.isAvailable()) {
-                    bootstrap.channel(EpollSocketChannel.class);
-                } else {
-                    bootstrap.channel(NioSocketChannel.class);
-                }
-                // Make sure we are doing round-robin processing
-                bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
-                bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
-
-                if (bootstrap.group() == null) {
-                    bootstrap.group(workerGroup);
-                }
-                bootstrap.localAddress(localAddress);
-                bootstrap.option(ChannelOption.SO_REUSEADDR, true);
-                return bootstrap;
-            }
-        };
-        this.hf = new BGPHandlerFactory(messageRegistry);
-        this.localAddress = localAddress;
-        this.defaultAddress = localAddress;
-    }
-
-    public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress,
-            final BGPPeerRegistry listener, final int retryTimer, final Optional<InetSocketAddress> localAddress) {
-        setLocalAddress(localAddress);
-        return this.disp.createClient(remoteAddress, listener, retryTimer);
-    }
-
-    public synchronized Future<Void> createReconnectingClient(final InetSocketAddress address, final BGPPeerRegistry peerRegistry,
-            final int retryTimer, final Optional<InetSocketAddress> localAddress) {
-        setLocalAddress(localAddress);
-        return this.disp.createReconnectingClient(address, peerRegistry, retryTimer, Optional.<KeyMapping>absent());
-    }
-
-    private synchronized void setLocalAddress(final Optional<InetSocketAddress> localAddress) {
-        if (localAddress.isPresent()) {
-            this.localAddress = localAddress.get();
-        } else {
-            this.localAddress = this.defaultAddress;
-        }
-    }
-}
index f6c66b2a11d44a3dc9a8d8cffca66741728f686f..0ccae805e9a9602e215ac3ba9d7a94c5fcc5b586 100644 (file)
@@ -41,7 +41,7 @@ final class BGPPeerBuilder {
         if (arguments.getInitiateConnection()) {
             for (final InetSocketAddress remoteAddress : arguments.getRemoteAddresses()) {
                 strictBGPPeerRegistry.addPeer(StrictBGPPeerRegistry.getIpAddress(remoteAddress), sessionListener, proposal);
-                addFutureListener(localAddress, ((BGPDispatcherImpl) dispatcher).createClient(localAddress, remoteAddress, strictBGPPeerRegistry, RETRY_TIMER));
+                addFutureListener(localAddress, ((BGPDispatcherImpl) dispatcher).createClient(localAddress, remoteAddress, strictBGPPeerRegistry, RETRY_TIMER, true));
             }
         } else {
             for (final InetSocketAddress remoteAddress : arguments.getRemoteAddresses()) {