import io.netty.util.concurrent.Future;
import java.math.BigInteger;
import java.net.InetSocketAddress;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
nf, new NioEventLoopGroup(), new NioEventLoopGroup());
}
- private static void checkNumberOfMessages(final int expectedNMessages, final TestingSessionListener listener) throws Exception {
+ private static List<Message> checkNumberOfMessages(final int expectedNMessages, final TestingSessionListener listener) {
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
if (expectedNMessages != listener.messages().size()) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
} else {
- return;
+ return listener.messages();
}
}
Assert.assertEquals(expectedNMessages, listener.messages().size());
+ return listener.messages();
+ }
+
+ private static List<Message> checkNumberOfMessages(final Optional<Integer> startAtNumberLsp, final int expectedNMessages, final TestingSessionListener listener) {
+ List<Message> messages = Collections.emptyList();
+ Stopwatch sw = Stopwatch.createStarted();
+ while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ messages = listener.messages();
+ if(startAtNumberLsp.isPresent()) {
+ if (startAtNumberLsp.get() + expectedNMessages > messages.size()) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ continue;
+ } else {
+ messages = messages.subList(startAtNumberLsp.get(), startAtNumberLsp.get() + expectedNMessages);
+ }
+ }
+ if (expectedNMessages != messages.size()) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ } else {
+ return messages;
+ }
+ }
+ Assert.assertEquals(expectedNMessages, messages.size());
+ return messages;
}
static TestingSessionListener checkSessionListenerNotNull(final TestingSessionListenerFactory factory, final String localAddress) {
protected static void checkSynchronizedSession(final int numberOfLsp, final TestingSessionListener pceSessionListener, final BigInteger expectedeInitialDb) throws InterruptedException {
assertNotNull(pceSessionListener.getSession());
assertTrue(pceSessionListener.isUp());
- Thread.sleep(1000);
//Send Open with LspDBV = 1
- final List<Message> messages = pceSessionListener.messages();
int numberOfSyncMessage = 1;
int numberOfLspExpected = numberOfLsp;
if(!expectedeInitialDb.equals(BigInteger.ZERO)) {
- checkSequequenceDBVersionSync(messages, expectedeInitialDb);
numberOfLspExpected += numberOfSyncMessage;
}
- assertEquals(numberOfLspExpected, messages.size());
+ final List<Message> messages = checkNumberOfMessages(numberOfLspExpected, pceSessionListener);
+ if(!expectedeInitialDb.equals(BigInteger.ZERO)) {
+ checkSequequenceDBVersionSync(messages, expectedeInitialDb);
+ }
final PCEPSession session = pceSessionListener.getSession();
checkSession(session, DEAD_TIMER, KEEP_ALIVE);
final BigInteger expectedDBVersion, final TestingSessionListener pceSessionListener) throws InterruptedException {
assertNotNull(pceSessionListener.getSession());
assertTrue(pceSessionListener.isUp());
- Thread.sleep(50);
- List<Message> messages;
- if(startAtNumberLsp.isPresent()) {
- messages = pceSessionListener.messages().subList(startAtNumberLsp.get(), startAtNumberLsp.get() + expectedNumberOfLsp);
- } else {
- messages = pceSessionListener.messages();
- }
+ final List<Message> messages = checkNumberOfMessages(startAtNumberLsp, expectedNumberOfLsp, pceSessionListener);
checkSequequenceDBVersionSync(messages, expectedDBVersion);
- assertEquals(expectedNumberOfLsp, messages.size());
final PCEPSession session = pceSessionListener.getSession();
checkSession(session, DEAD_TIMER, KEEP_ALIVE);