Revert ignore test change introded by bug-6120.
Improve some test, removing thread.sleep were possible.
Change-Id: I084a015db8d4b6be2b210f8c5d6298aaddc9c69a
Signed-off-by: Claudio <cgaspari@cisco.com>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class BmpMockDispatcher {
+final class BmpMockDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(BmpMockDispatcher.class);
private static final int CONNECT_TIMEOUT = 2000;
private static final int MAX_CONNECTIONS_COUNT = 128;
- final BmpHandlerFactory hf;
+ private final BmpHandlerFactory hf;
private final BmpSessionFactory sessionFactory;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
- public BmpMockDispatcher(final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
+ BmpMockDispatcher(final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
this.sessionFactory = Preconditions.checkNotNull(sessionFactory);
Preconditions.checkNotNull(registry);
this.hf = new BmpHandlerFactory(registry);
private Bootstrap createClientInstance(final SocketAddress localAddress) {
final NioEventLoopGroup workergroup = new NioEventLoopGroup();
- final Bootstrap b = new Bootstrap();
+ final Bootstrap bootstrap = new Bootstrap();
- b.channel(NioSocketChannel.class);
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
- b.group(workergroup);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+ bootstrap.group(workergroup);
- b.handler(new ChannelInitializer<NioSocketChannel>() {
+ bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(final NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
}
});
- b.localAddress(localAddress);
- return b;
+ bootstrap.localAddress(localAddress);
+ return bootstrap;
}
- public ChannelFuture createClient(final SocketAddress localAddress, final SocketAddress remoteAddress) {
+ ChannelFuture createClient(final SocketAddress localAddress, final SocketAddress remoteAddress) {
Preconditions.checkNotNull(localAddress);
Preconditions.checkNotNull(remoteAddress);
// ideally we should use Bootstrap clones here
- Bootstrap b = createClientInstance(localAddress);
- final ChannelFuture f = b.connect(remoteAddress);
+ final Bootstrap bootstrap = createClientInstance(localAddress);
+ final ChannelFuture channelFuture = bootstrap.connect(remoteAddress);
LOG.info("BMP client {} <--> {} deployed", localAddress, remoteAddress);
- return f;
+ return channelFuture;
}
private ServerBootstrap createServerInstance() {
- final ServerBootstrap b = new ServerBootstrap();
- b.childHandler(new ChannelInitializer<Channel>() {
+ final ServerBootstrap serverBootstrap = new ServerBootstrap();
+ serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(final Channel ch) throws Exception {
ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
}
});
- b.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
- b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- b.channel(NioServerSocketChannel.class);
- b.group(bossGroup, workerGroup);
- return b;
+ serverBootstrap.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
+ serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ serverBootstrap.channel(NioServerSocketChannel.class);
+ serverBootstrap.group(bossGroup, workerGroup);
+ return serverBootstrap;
}
- public ChannelFuture createServer(final InetSocketAddress localAddress) {
+ ChannelFuture createServer(final InetSocketAddress localAddress) {
Preconditions.checkNotNull(localAddress);
-
- ServerBootstrap b = createServerInstance();
- final ChannelFuture f = b.bind(localAddress);
+ final ServerBootstrap serverBootstrap = createServerInstance();
+ final ChannelFuture channelFuture = serverBootstrap.bind(localAddress);
LOG.info("Initiated BMP server at {}.", localAddress);
- return f;
+ return channelFuture;
}
}
package org.opendaylight.protocol.bmp.mock;
+import static org.opendaylight.protocol.bmp.mock.BmpMockTest.waitFutureSuccess;
+
import com.google.common.base.Optional;
import com.google.common.net.InetAddresses;
import io.netty.channel.Channel;
final BmpMockDispatcher dispatcher = new BmpMockDispatcher(this.registry, this.sessionFactory);
final int port = getRandomPort();
final BmpDispatcherImpl serverDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
- this.registry, this.sessionFactory);
- serverDispatcher.createServer(new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port), this.slf, Optional.<KeyMapping>absent());
-
+ this.registry, this.sessionFactory);
+ final ChannelFuture futureServer = serverDispatcher.createServer(new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port), this.slf, Optional.<KeyMapping>absent());
+ waitFutureSuccess(futureServer);
final ChannelFuture channelFuture = dispatcher.createClient(new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 0),
- new InetSocketAddress(InetAddresses.forString("127.0.0.3"), port));
+ new InetSocketAddress(InetAddresses.forString("127.0.0.3"), port));
+ waitFutureSuccess(channelFuture);
final Channel channel = channelFuture.sync().channel();
Assert.assertTrue(channel.isActive());
final BmpMockDispatcher dispatcher = new BmpMockDispatcher(this.registry, this.sessionFactory);
final int port = getRandomPort();
final BmpDispatcherImpl serverDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
- this.registry, this.sessionFactory);
- dispatcher.createServer(new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port));
-
+ this.registry, this.sessionFactory);
+ final ChannelFuture futureServer = dispatcher.createServer(new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port));
+ waitFutureSuccess(futureServer);
final ChannelFuture channelFuture = serverDispatcher.createClient(new InetSocketAddress(InetAddresses.forString("127.0.0.3"), port), this.slf, Optional.<KeyMapping>absent());
+ waitFutureSuccess(channelFuture);
final Channel channel = channelFuture.sync().channel();
Assert.assertTrue(channel.isActive());
package org.opendaylight.protocol.bmp.mock;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.protocol.bmp.api.BmpSession;
import org.opendaylight.protocol.bmp.api.BmpSessionListener;
import org.opendaylight.protocol.bmp.api.BmpSessionListenerFactory;
-import org.opendaylight.protocol.bmp.parser.BmpActivator;
import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
+import org.opendaylight.protocol.bmp.parser.BmpActivator;
import org.opendaylight.protocol.bmp.spi.registry.BmpExtensionProviderActivator;
import org.opendaylight.protocol.bmp.spi.registry.BmpExtensionProviderContext;
import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
public void setUp() {
final BmpExtensionProviderContext ctx = new SimpleBmpExtensionProviderContext();
this.bmpActivator = new BmpActivator(
- ServiceLoaderBGPExtensionProviderContext.getSingletonInstance());
+ ServiceLoaderBGPExtensionProviderContext.getSingletonInstance());
this.bmpActivator.start(ctx);
this.bmpDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(), ctx.getBmpMessageRegistry(),
- new DefaultBmpSessionFactory());
+ new DefaultBmpSessionFactory());
this.serverPort = BmpMockDispatcherTest.getRandomPort();
}
@Test
public void testMain() throws Exception {
- final BmpSessionListenerFactory bmpSessionListenerFactory = new BmpSessionListenerFactory() {
- @Override
- public BmpSessionListener getSessionListener() {
- return BmpMockTest.this.sessionListener;
- }
- };
- Channel serverChannel = bmpDispatcher.createServer(new InetSocketAddress("127.0.0.1", serverPort),
- bmpSessionListenerFactory, Optional.<KeyMapping>absent()).channel();
+ final BmpSessionListenerFactory bmpSessionListenerFactory = () -> BmpMockTest.this.sessionListener;
+ final ChannelFuture futureServer = bmpDispatcher.createServer(new InetSocketAddress("127.0.0.1", serverPort),
+ bmpSessionListenerFactory, Optional.<KeyMapping>absent());
+ waitFutureSuccess(futureServer);
+ Channel serverChannel = futureServer.channel();
- BmpMock.main(new String[] {"--remote_address", "127.0.0.1:" + serverPort, "--peers_count", "3", "--pre_policy_routes", "3"});
+ BmpMock.main(new String[]{"--remote_address", "127.0.0.1:" + serverPort, "--peers_count", "3", "--pre_policy_routes", "3"});
Thread.sleep(1000);
Mockito.verify(this.sessionListener).onSessionUp(Mockito.any(BmpSession.class));
//1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
@Test
public void testMainInPassiveMode() throws Exception {
- final BmpSessionListenerFactory bmpSessionListenerFactory = new BmpSessionListenerFactory() {
- @Override
- public BmpSessionListener getSessionListener() {
- return BmpMockTest.this.sessionListener;
- }
- };
+ final BmpSessionListenerFactory bmpSessionListenerFactory = () -> BmpMockTest.this.sessionListener;
// create a local server in passive mode instead
BmpMock.main(new String[]{"--local_address", "127.0.0.1:" + serverPort, "--peers_count", "3", "--pre_policy_routes", "3", "--passive"});
- Channel serverChannel = bmpDispatcher.createClient(new InetSocketAddress("127.0.0.1", serverPort),
- bmpSessionListenerFactory, Optional.<KeyMapping>absent()).channel();
-
+ final ChannelFuture futureServer = bmpDispatcher.createClient(new InetSocketAddress("127.0.0.1", serverPort),
+ bmpSessionListenerFactory, Optional.<KeyMapping>absent());
+ waitFutureSuccess(futureServer);
+ Channel serverChannel = futureServer.channel();
Thread.sleep(1000);
Mockito.verify(this.sessionListener).onSessionUp(Mockito.any(BmpSession.class));
//1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
serverChannel.close().sync();
}
+
+ static void waitFutureSuccess(final ChannelFuture future) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ future.addListener(future1 -> latch.countDown());
+ Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
+ }
}
<groupId>org.opendaylight.mdsal.model</groupId>
<artifactId>ietf-inet-types-2013-07-15</artifactId>
</dependency>
-
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bgp-parser-impl</artifactId>
<artifactId>org.osgi.core</artifactId>
<scope>provided</scope>
</dependency>
-
<!-- Testing dependencies -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
-
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-manager</artifactId>
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
import org.slf4j.LoggerFactory;
-// Disabling this test for now as it's failing frequently on jenkins, seems due to infrastructure issues. The
-// tests succeed when run locally.
-@Ignore
public class BGPDispatcherImplTest {
private static final AsNumber AS_NUMBER = new AsNumber(30L);
configureClient(ctx);
}
+ private static <T extends Future> void waitFutureSuccess(final T future) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ future.addListener(future1 -> latch.countDown());
+ Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
+ }
+
+
+ public static void checkIdleState (final SimpleSessionListener listener){
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ if (BGPSessionImpl.State.IDLE != listener.getState()){
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ } else {
+ return;
+ }
+ }
+ Assert.fail();
+ }
+
private void configureClient(final BGPExtensionProviderContext ctx) {
final InetSocketAddress clientAddress = new InetSocketAddress("127.0.11.0", 1791);
final IpAddress clientPeerIp = new IpAddress(new Ipv4Address(clientAddress.getAddress().getHostAddress()));
this.registry.addPeer(new IpAddress(new Ipv4Address(serverAddress.getAddress().getHostAddress())), this.serverListener, createPreferences(serverAddress));
LoggerFactory.getLogger(BGPDispatcherImplTest.class).info("createServer");
final ChannelFuture future = this.serverDispatcher.createServer(this.registry, serverAddress);
- future.addListener(future1 -> Preconditions.checkArgument(future1.isSuccess(), "Unable to start bgp server on %s", future1.cause()));
+ future.addListener(new GenericFutureListener<Future<Void>>() {
+ @Override
+ public void operationComplete(final Future<Void> future) {
+ Preconditions.checkArgument(future.isSuccess(), "Unable to start bgp server on %s", future.cause());
+ }
+ });
+ waitFutureSuccess(future);
return future.channel();
}
final InetSocketAddress serverAddress = new InetSocketAddress("127.0.10.0", 1790);
final Channel serverChannel = createServer(serverAddress);
Thread.sleep(1000);
- final BGPSessionImpl session = this.clientDispatcher.createClient(serverAddress, this.registry, 0, Optional.absent()).get();
+ final Future<BGPSessionImpl> futureClient = this.clientDispatcher.createClient(serverAddress, this.registry, 2, Optional.absent());
+ waitFutureSuccess(futureClient);
+ final BGPSessionImpl session = futureClient.get();
Assert.assertEquals(BGPSessionImpl.State.UP, this.clientListener.getState());
Assert.assertEquals(BGPSessionImpl.State.UP, this.serverListener.getState());
Assert.assertEquals(AS_NUMBER, session.getAsNumber());
Assert.assertTrue(serverChannel.isWritable());
session.close();
- Thread.sleep(3000);
- Assert.assertEquals(BGPSessionImpl.State.IDLE, this.clientListener.getState());
- Assert.assertEquals(BGPSessionImpl.State.IDLE, this.serverListener.getState());
+ checkIdleState(this.clientListener);
+ checkIdleState(this.serverListener);
}
@Test
public void testCreateReconnectingClient() throws Exception {
final InetSocketAddress serverAddress = new InetSocketAddress("127.0.20.0", 1792);
final Future<Void> future = this.clientDispatcher.createReconnectingClient(serverAddress, this.registry, RETRY_TIMER, Optional.absent());
+ waitFutureSuccess(future);
final Channel serverChannel = createServer(serverAddress);
Assert.assertEquals(BGPSessionImpl.State.UP, this.serverListener.getState());
Assert.assertTrue(serverChannel.isWritable());
future.cancel(true);
this.serverListener.releaseConnection();
- Thread.sleep(3000);
- Assert.assertEquals(BGPSessionImpl.State.IDLE, this.serverListener.getState());
+ checkIdleState(this.serverListener);
}
private BGPSessionPreferences createPreferences(final InetSocketAddress socketAddress) {
final List<BgpParameters> tlvs = Lists.newArrayList();
final List<OptionalCapabilities> capas = Lists.newArrayList();
capas.add(new OptionalCapabilitiesBuilder().setCParameters(new CParametersBuilder().addAugmentation(
- CParameters1.class, new CParameters1Builder().setMultiprotocolCapability(new MultiprotocolCapabilityBuilder()
+ CParameters1.class, new CParameters1Builder().setMultiprotocolCapability(new MultiprotocolCapabilityBuilder()
.setAfi(IPV_4_TT.getAfi()).setSafi(IPV_4_TT.getSafi()).build()).build())
- .setAs4BytesCapability(new As4BytesCapabilityBuilder().setAsNumber(new AsNumber(30L)).build())
- .build()).build());
+ .setAs4BytesCapability(new As4BytesCapabilityBuilder().setAsNumber(new AsNumber(30L)).build())
+ .build()).build());
capas.add(new OptionalCapabilitiesBuilder().setCParameters(BgpExtendedMessageUtil.EXTENDED_MESSAGE_CAPABILITY).build());
tlvs.add(new BgpParametersBuilder().setOptionalCapabilities(capas).build());
- return new BGPSessionPreferences(AS_NUMBER, (short) 4, new BgpId(socketAddress.getAddress().getHostAddress()), AS_NUMBER, tlvs, Optional.absent());
+ final BgpId bgpId = new BgpId(new Ipv4Address(socketAddress.getAddress().getHostAddress()));
+ return new BGPSessionPreferences(AS_NUMBER, (short) 4, bgpId, AS_NUMBER, tlvs, Optional.absent());
}
-
}
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.opendaylight.protocol.bgp.rib.impl.BGPDispatcherImplTest.checkIdleState;
import com.google.common.collect.Lists;
import io.netty.channel.Channel;
this.bgpSession.sessionUp();
Assert.assertEquals(BGPSessionImpl.State.UP, this.listener.getState());
this.bgpSession.endOfInput();
- Thread.sleep(3000);
- Assert.assertEquals(BGPSessionImpl.State.IDLE, this.listener.getState());
+ checkIdleState(this.listener);
}
@Test
public void testHoldTimerExpire() throws InterruptedException {
this.bgpSession.sessionUp();
- Thread.sleep(3500);
- Assert.assertEquals(BGPSessionImpl.State.IDLE, this.bgpSession.getState());
+ checkIdleState(this.listener);
Assert.assertEquals(3, this.receivedMsgs.size());
Assert.assertTrue(this.receivedMsgs.get(2) instanceof Notify);
final Notify error = (Notify) this.receivedMsgs.get(2);
assertTrue(this.receivedMsgs.get(1) instanceof Keepalive);
this.clientSession.handleMessage(new KeepaliveBuilder().build());
assertEquals(this.clientSession.getState(), BGPClientSessionNegotiator.State.FINISHED);
- Thread.sleep(1000);
- Thread.sleep(100);
}
@Test
final Optional<KeyMapping> optionalKey = Optional.fromNullable(keys);
setChannelFactory(b, optionalKey);
b.option(ChannelOption.SO_KEEPALIVE, true);
+ b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
final long retryTimer = reconnectTime == -1 ? 0 : reconnectTime;
final PCCReconnectPromise promise = new PCCReconnectPromise(remoteAddress, (int) retryTimer, CONNECT_TIMEOUT, b);
package org.opendaylight.protocol.pcep.pcc.mock;
+import static org.opendaylight.protocol.pcep.pcc.mock.PCCMockCommon.checkSessionListenerNotNull;
+import static org.opendaylight.protocol.pcep.pcc.mock.WaitForFutureSucces.waitFutureSuccess;
+
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.opendaylight.protocol.pcep.PCEPCapability;
import org.opendaylight.protocol.pcep.PCEPDispatcher;
import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.pcc.mock.protocol.PCCDispatcherImpl;
import org.opendaylight.protocol.pcep.spi.pojo.ServiceLoaderPCEPExtensionProviderContext;
-// This test is set to ignore for now as it's failing frequently on jenkins due to
-// infrastructure issues.
-@Ignore
+@RunWith(Parameterized.class)
public class PCCDispatcherImplTest {
private static final List<PCEPCapability> CAPS = new ArrayList<>();
private static final PCEPSessionProposalFactory PROPOSAL = new BasePCEPSessionProposalFactory(30, 120, CAPS);
-
- private PCCDispatcherImpl dispatcher;
private final DefaultPCEPSessionNegotiatorFactory nf = new DefaultPCEPSessionNegotiatorFactory(PROPOSAL, 0);
+ private final Random random = new Random();
+ private PCCDispatcherImpl dispatcher;
private PCEPDispatcher pcepDispatcher;
private InetSocketAddress serverAddress;
private InetSocketAddress clientAddress;
- private final Random random = new Random();
private EventLoopGroup workerGroup;
private EventLoopGroup bossGroup;
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[100][0]);
+ }
+
+ public PCCDispatcherImplTest() {
+ }
+
@Before
public void setUp() {
this.workerGroup = new NioEventLoopGroup();
this.bossGroup = new NioEventLoopGroup();
this.dispatcher = new PCCDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry());
this.pcepDispatcher = new PCEPDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry(),
- this.nf, this.bossGroup, this.workerGroup);
+ this.nf, this.bossGroup, this.workerGroup);
this.serverAddress = new InetSocketAddress("127.0.5.0", getRandomPort());
this.clientAddress = new InetSocketAddress("127.0.4.0", getRandomPort());
}
@After
public void tearDown() throws InterruptedException, ExecutionException {
this.dispatcher.close();
+ closeEventLoopGroups();
+ }
+
+ private void closeEventLoopGroups() throws ExecutionException, InterruptedException {
this.workerGroup.shutdownGracefully().get();
this.bossGroup.shutdownGracefully().get();
}
@Test
public void testClientReconnect() throws Exception {
final Future<PCEPSession> futureSession = this.dispatcher.createClient(this.serverAddress, 1, new TestingSessionListenerFactory(),
- this.nf, null, this.clientAddress);
-
+ this.nf, null, this.clientAddress);
+ waitFutureSuccess(futureSession);
final TestingSessionListenerFactory slf = new TestingSessionListenerFactory();
- final Channel channel = this.pcepDispatcher.createServer(this.serverAddress, slf, null).channel();
+ final ChannelFuture futureServer = this.pcepDispatcher.createServer(this.serverAddress, slf, null);
+ waitFutureSuccess(futureServer);
+ final Channel channel = futureServer.channel();
Assert.assertNotNull(futureSession.get());
- final TestingSessionListener sl = slf.getSessionListenerByRemoteAddress(this.clientAddress.getAddress());
- Assert.assertNotNull(sl);
+ checkSessionListenerNotNull(slf, "127.0.4.0");
+ final TestingSessionListener sl = checkSessionListenerNotNull(slf, this.clientAddress.getAddress().getHostAddress());
+ Assert.assertNotNull(sl.getSession());
Assert.assertTrue(sl.isUp());
-
channel.close().get();
- this.workerGroup.shutdownGracefully().get();
- this.bossGroup.shutdownGracefully().get();
+ closeEventLoopGroups();
this.workerGroup = new NioEventLoopGroup();
this.bossGroup = new NioEventLoopGroup();
this.pcepDispatcher = new PCEPDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry(),
- this.nf, this.bossGroup, this.workerGroup);
+ this.nf, this.bossGroup, this.workerGroup);
final TestingSessionListenerFactory slf2 = new TestingSessionListenerFactory();
- this.pcepDispatcher.createServer(this.serverAddress, slf2, null).channel();
- // sleep for bit more than retry time of 1 sec.
- Thread.sleep(1500);
-
- final TestingSessionListener sl2 = slf2.getSessionListenerByRemoteAddress(this.clientAddress.getAddress());
- Assert.assertNotNull(sl2);
+ final ChannelFuture future2 = this.pcepDispatcher.createServer(this.serverAddress, slf2, null);
+ waitFutureSuccess(future2);
+ final Channel channel2 = future2.channel();
+ final TestingSessionListener sl2 = checkSessionListenerNotNull(slf2, this.clientAddress.getAddress().getHostAddress());
+ Assert.assertNotNull(sl2.getSession());
Assert.assertTrue(sl2.isUp());
}
private int getRandomPort() {
return this.random.nextInt(4000) + 1024;
}
-
}
checkSynchronizedSession(8, pceSessionListener, numberOflspAndDBv);
Thread.sleep(4000);
assertFalse(pceSessionListener.isUp());
- Thread.sleep(6000);
- Thread.sleep(1000);
final int expetedNumberOfLspAndEndOfSync = 3;
final BigInteger expectedFinalDBVersion = BigInteger.valueOf(10);
final TestingSessionListener sessionListenerAfterReconnect = getListener(factory);
- checkResyncSession(Optional.<Integer>absent(), expetedNumberOfLspAndEndOfSync, numberOflspAndDBv, expectedFinalDBVersion, sessionListenerAfterReconnect);
+ checkResyncSession(Optional.absent(), expetedNumberOfLspAndEndOfSync, numberOflspAndDBv, expectedFinalDBVersion, sessionListenerAfterReconnect);
channel.close().get();
}
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.opendaylight.protocol.pcep.pcc.mock.WaitForFutureSucces.waitFutureSuccess;
import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
import org.junit.Before;
import org.opendaylight.protocol.pcep.PCEPCapability;
import org.opendaylight.protocol.pcep.PCEPDispatcher;
private final static short KEEP_ALIVE = 40;
private final static short DEAD_TIMER = 120;
protected final InetSocketAddress socket = new InetSocketAddress(PCCMockCommon.REMOTE_ADDRESS, getPort());
+ protected PCCSessionListener pccSessionListener;
private PCEPDispatcher pceDispatcher;
private PCCDispatcherImpl pccDispatcher;
- protected PCCSessionListener pccSessionListener;
protected abstract List<PCEPCapability> getCapabilities();
nf, new NioEventLoopGroup(), new NioEventLoopGroup());
}
- protected static TestingSessionListener checkSessionListener(final int numMessages, final Channel channel, final TestingSessionListenerFactory factory, final String localAddress) throws
- ExecutionException, InterruptedException {
- final TestingSessionListener sessionListener = factory.getSessionListenerByRemoteAddress(InetAddresses.forString(localAddress));
- assertNotNull(sessionListener);
+ protected static TestingSessionListener checkSessionListener(final int numMessages, final Channel channel, final TestingSessionListenerFactory factory,
+ final String localAddress) throws
+ Exception {
+ final TestingSessionListener sessionListener = checkSessionListenerNotNull(factory, localAddress);
assertTrue(sessionListener.isUp());
+ checkNumberOfMessages(numMessages, sessionListener);
assertEquals(numMessages, sessionListener.messages().size());
channel.close().get();
return sessionListener;
}
- protected Channel createServer(final TestingSessionListenerFactory factory, final InetSocketAddress serverAddress2) {
+ private static void checkNumberOfMessages(final int expectedNMessages, final TestingSessionListener listener) throws Exception {
+ Stopwatch sw = Stopwatch.createStarted();
+ while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ if (expectedNMessages != listener.messages().size()) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ } else {
+ return;
+ }
+ }
+ Assert.assertEquals(expectedNMessages, listener.messages().size());
+ }
+
+ static TestingSessionListener checkSessionListenerNotNull(final TestingSessionListenerFactory factory, final String localAddress) {
+ Stopwatch sw = Stopwatch.createStarted();
+ TestingSessionListener listener = null;
+ while (sw.elapsed(TimeUnit.SECONDS) <= 1000) {
+ listener = factory.getSessionListenerByRemoteAddress(InetAddresses.forString(localAddress));
+ if (listener == null) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ } else {
+ return listener;
+ }
+ }
+ throw new NullPointerException();
+ }
+
+ protected Channel createServer(final TestingSessionListenerFactory factory, final InetSocketAddress serverAddress2) throws InterruptedException {
return createServer(factory, serverAddress2, null);
}
protected Channel createServer(final TestingSessionListenerFactory factory, final InetSocketAddress
- serverAddress2, final PCEPPeerProposal peerProposal) {
+ serverAddress2, final PCEPPeerProposal peerProposal) throws InterruptedException {
final PCEPExtensionProviderContext ctx = ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance();
final StatefulActivator activator07 = new StatefulActivator();
final SyncOptimizationsActivator optimizationsActivator = new SyncOptimizationsActivator();
activator07.start(ctx);
optimizationsActivator.start(ctx);
- return this.pceDispatcher.createServer(serverAddress2, factory, peerProposal).channel();
+ final ChannelFuture future = this.pceDispatcher.createServer(serverAddress2, factory, peerProposal);
+ waitFutureSuccess(future);
+ return future.channel();
}
protected static void checkSynchronizedSession(final int numberOfLsp, final TestingSessionListener pceSessionListener, final BigInteger expectedeInitialDb) throws InterruptedException {
final List<Message> messages = pceSessionListener.messages();
int numberOfSyncMessage = 1;
int numberOfLspExpected = numberOfLsp;
- if(!expectedeInitialDb.equals(BigInteger.ZERO)) {
+ if (!expectedeInitialDb.equals(BigInteger.ZERO)) {
checkSequequenceDBVersionSync(messages, expectedeInitialDb);
numberOfLspExpected += numberOfSyncMessage;
}
}
protected static void checkResyncSession(final Optional<Integer> startAtNumberLsp, final int expectedNumberOfLsp, final BigInteger startingDBVersion,
- final BigInteger expectedDBVersion, final TestingSessionListener pceSessionListener) {
+ final BigInteger expectedDBVersion, final TestingSessionListener pceSessionListener) throws InterruptedException {
+ assertNotNull(pceSessionListener.getSession());
assertTrue(pceSessionListener.isUp());
+ Thread.sleep(50);
List<Message> messages;
- if(startAtNumberLsp.isPresent()) {
+ if (startAtNumberLsp.isPresent()) {
messages = pceSessionListener.messages().subList(startAtNumberLsp.get(), startAtNumberLsp.get() + expectedNumberOfLsp);
} else {
messages = pceSessionListener.messages();
}
protected TestingSessionListener getListener(final TestingSessionListenerFactory factory) {
- return factory.getSessionListenerByRemoteAddress(InetAddresses.forString(PCCMockTest.LOCAL_ADDRESS));
+ return checkSessionListenerNotNull(factory, PCCMockTest.LOCAL_ADDRESS);
}
}
"--redelegation-timeout", "0", "--state-timeout", "-1"};
@Test
- public void testSessionEstablishment() throws UnknownHostException, InterruptedException, ExecutionException {
+ public void testSessionEstablishment() throws Exception {
final TestingSessionListenerFactory factory = new TestingSessionListenerFactory();
final Channel channel = createServer(factory, this.socket);
Main.main(mainInput);
@Test
- public void testMockPCCToManyPCE() throws InterruptedException, ExecutionException, UnknownHostException {
+ public void testMockPCCToManyPCE() throws Exception {
final TestingSessionListenerFactory factory = new TestingSessionListenerFactory();
final TestingSessionListenerFactory factory2 = new TestingSessionListenerFactory();
final TestingSessionListenerFactory factory3 = new TestingSessionListenerFactory();
final TestingSessionListenerFactory factory = new TestingSessionListenerFactory();
final Channel channel = createServer(factory, socket, new PCCPeerProposal());
- Thread.sleep(200);
PCEPSession session = createPCCSession(BigInteger.TEN).get();
assertNotNull(session);
final TestingSessionListener pceSessionListener = getListener(factory);
assertNotNull(pceSessionListener);
- Thread.sleep(1000);
+ assertNotNull(pceSessionListener.getSession());
checkResyncSession(Optional.<Integer>absent(), 11, null, BigInteger.valueOf(10), pceSessionListener);
channel.close().get();
}
final int lspQuantity = 3;
final BigInteger numberOflspAndDBv = BigInteger.valueOf(lspQuantity);
this.channel = createServer(factory, socket, new PCCPeerProposal());
- Thread.sleep(300);
PCEPSession session = createPCCSession(numberOflspAndDBv).get();
assertNotNull(session);
final TestingSessionListener pceSessionListener = getListener(factory);
assertNotNull(pceSessionListener);
checkSynchronizedSession(lspQuantity, pceSessionListener, numberOflspAndDBv);
pccSessionListener.onMessage(session, createTriggerLspResync());
- Thread.sleep(300);
final TestingSessionListener sessionListenerAfterReconnect = getListener(factory);
checkResyncSession(Optional.of(lspQuantity), 4, null, numberOflspAndDBv, sessionListenerAfterReconnect);
channel.close().get();
final BigInteger numberOflspAndDBv = BigInteger.valueOf(lspQuantity);
this.channel = createServer(factory, socket, new PCCPeerProposal());
- Thread.sleep(200);
PCEPSession session = createPCCSession(numberOflspAndDBv).get();
assertNotNull(session);
final TestingSessionListener pceSessionListener = getListener(factory);
assertNotNull(pceSessionListener);
checkSynchronizedSession(lspQuantity, pceSessionListener, numberOflspAndDBv);
pccSessionListener.onMessage(session, createTriggerLspResync());
- Thread.sleep(300);
final TestingSessionListener sessionListenerAfterReconnect = getListener(factory);
checkResyncSession(Optional.of(lspQuantity), 2, null, numberOflspAndDBv, sessionListenerAfterReconnect);
channel.close().get();
public void testSessionTriggeredSync() throws Exception {
final TestingSessionListenerFactory factory = new TestingSessionListenerFactory();
this.channel = createServer(factory, socket, new PCCPeerProposal());
- Thread.sleep(200);
final BigInteger numberOflspAndDBv = BigInteger.valueOf(3);
PCEPSession session = createPCCSession(numberOflspAndDBv).get();
assertNotNull(session);
assertNotNull(pceSessionListener);
checkSynchronizedSession(0, pceSessionListener, BigInteger.ZERO);
pccSessionListener.onMessage(session, createTriggerMsg());
- Thread.sleep(300);
checkSynchronizedSession(3, pceSessionListener, numberOflspAndDBv);
this.channel.close().get();
}
package org.opendaylight.protocol.pcep.pcc.mock;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.PCEPTerminationReason;
public class TestingSessionListener implements PCEPSessionListener {
private static final Logger LOG = LoggerFactory.getLogger(TestingSessionListener.class);
+ private final CountDownLatch sessionLatch = new CountDownLatch(1);
private final List<Message> messages = Lists.newArrayList();
LOG.debug("Session up.");
this.up = true;
this.session = session;
+ sessionLatch.countDown();
+
}
@Override
LOG.debug("Session terminated. Cause : {}", cause);
}
- public List<Message> messages() {
+ List<Message> messages() {
return this.messages;
}
- public boolean isUp () {
+ boolean isUp () {
return this.up;
}
public PCEPSession getSession() {
+ Assert.assertEquals("Session up", true, Uninterruptibles.awaitUninterruptibly(sessionLatch, 10, TimeUnit.SECONDS));
return this.session;
}
}
return sessionListener;
}
- public TestingSessionListener getSessionListenerByRemoteAddress(final InetAddress ipAddress) {
+ TestingSessionListener getSessionListenerByRemoteAddress(final InetAddress ipAddress) {
for (final TestingSessionListener sessionListener : this.sessionListeners) {
if (sessionListener.isUp()) {
final PCEPSession session = sessionListener.getSession();
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.pcep.pcc.mock;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.netty.util.concurrent.Future;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+class WaitForFutureSucces {
+ private WaitForFutureSucces() {
+ throw new UnsupportedOperationException();
+ }
+
+ static <T extends Future> void waitFutureSuccess(final T future) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ future.addListener(future1 -> latch.countDown());
+ Uninterruptibles.awaitUninterruptibly(latch, 20, TimeUnit.SECONDS);
+ }
+}