import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.protocol.concepts.KeyMapping;
import org.opendaylight.protocol.pcep.PCEPCapability;
-import org.opendaylight.protocol.pcep.PCEPSessionListener;
-import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
import org.opendaylight.protocol.pcep.ietf.initiated00.CrabbeInitiatedActivator;
import org.opendaylight.protocol.pcep.ietf.stateful07.StatefulActivator;
private final int redelegationTimeout;
private final int stateTimeout;
private final PCEPCapability pcepCapabilities;
+ private final Timer timer = new HashedWheelTimer();
private PCCDispatcherImpl pccDispatcher;
- private Timer timer = new HashedWheelTimer();
- public PCCsBuilder(final int lsps, final boolean pcError, final int pccCount, @Nonnull final InetSocketAddress localAddress,
- @Nonnull final List<InetSocketAddress> remoteAddress, final short keepAlive, final short deadTimer, @Nonnull final String password,
- final long reconnectTime, final int redelegationTimeout, final int stateTimeout, @Nonnull final PCEPCapability pcepCapabilities) {
+ PCCsBuilder(final int lsps, final boolean pcError, final int pccCount, @Nonnull final InetSocketAddress localAddress,
+ @Nonnull final List<InetSocketAddress> remoteAddress, final short keepAlive, final short deadTimer,
+ @Nullable final String password, final long reconnectTime, final int redelegationTimeout, final int stateTimeout,
+ @Nonnull final PCEPCapability pcepCapabilities) {
this.lsps = lsps;
this.pcError = pcError;
this.pccCount = pccCount;
startActivators();
}
- void createPCCs(final BigInteger initialDBVersion, final Optional<TimerHandler> timerHandler) throws InterruptedException, ExecutionException {
+ void createPCCs(final BigInteger initialDBVersion, final Optional<TimerHandler> timerHandler)
+ throws InterruptedException, ExecutionException {
InetAddress currentAddress = this.localAddress.getAddress();
- this.pccDispatcher = new PCCDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry());
+ this.pccDispatcher = new PCCDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance()
+ .getMessageHandlerRegistry());
if(timerHandler.isPresent()) {
timerHandler.get().setPCCDispatcher(this.pccDispatcher);
}
for (int i = 0; i < this.pccCount; i++) {
- final PCCTunnelManager tunnelManager = new PCCTunnelManagerImpl(this.lsps, currentAddress, this.redelegationTimeout, this.stateTimeout,
- this.timer, timerHandler);
+ final PCCTunnelManager tunnelManager = new PCCTunnelManagerImpl(this.lsps, currentAddress,
+ this.redelegationTimeout, this.stateTimeout, this.timer, timerHandler);
createPCC(new InetSocketAddress(currentAddress, localAddress.getPort()), tunnelManager, initialDBVersion);
currentAddress = InetAddresses.increment(currentAddress);
}
}
private void createPCC(@Nonnull final InetSocketAddress localAddress, @Nonnull final PCCTunnelManager tunnelManager,
- final BigInteger initialDBVersion) throws InterruptedException, ExecutionException {
-
+ final BigInteger initialDBVersion) throws InterruptedException, ExecutionException {
final PCEPSessionNegotiatorFactory<PCEPSessionImpl> snf = getSessionNegotiatorFactory();
for (final InetSocketAddress pceAddress : this.remoteAddress) {
- this.pccDispatcher.createClient(pceAddress, this.reconnectTime,
- new PCEPSessionListenerFactory() {
- @Override
- public PCEPSessionListener getSessionListener() {
- return new PCCSessionListener(remoteAddress.indexOf(pceAddress), tunnelManager, pcError);
- }
- }, snf, KeyMapping.getKeyMapping(pceAddress.getAddress(), password), localAddress, initialDBVersion);
+ this.pccDispatcher.createClient(pceAddress, this.reconnectTime, () -> new PCCSessionListener(
+ remoteAddress.indexOf(pceAddress), tunnelManager, pcError), snf,
+ KeyMapping.getKeyMapping(pceAddress.getAddress(), password), localAddress, initialDBVersion);
}
}
private PCEPSessionNegotiatorFactory<PCEPSessionImpl> getSessionNegotiatorFactory() {
final List<PCEPCapability> capabilities = Lists.newArrayList(this.pcepCapabilities);
- return new DefaultPCEPSessionNegotiatorFactory(new BasePCEPSessionProposalFactory(this.deadTimer, this.keepAlive, capabilities), 0);
+ return new DefaultPCEPSessionNegotiatorFactory(new BasePCEPSessionProposalFactory(this.deadTimer,
+ this.keepAlive, capabilities), 0);
}
private static void startActivators() {
private final EventLoopGroup workerGroup;
public PCCDispatcherImpl(@Nonnull final MessageRegistry registry) {
- if(Epoll.isAvailable()){
+ if (Epoll.isAvailable()) {
this.workerGroup = new EpollEventLoopGroup();
} else {
this.workerGroup = new NioEventLoopGroup();
@Override
public Future<PCEPSession> createClient(@Nonnull final InetSocketAddress remoteAddress, @Nonnull final long reconnectTime,
- @Nonnull final PCEPSessionListenerFactory listenerFactory, @Nonnull final PCEPSessionNegotiatorFactory negotiatorFactory,
- @Nonnull final KeyMapping keys, @Nullable final InetSocketAddress localAddress) {
+ @Nonnull final PCEPSessionListenerFactory listenerFactory, @Nonnull final PCEPSessionNegotiatorFactory negotiatorFactory,
+ @Nullable final KeyMapping keys, @Nonnull final InetSocketAddress localAddress) {
return createClient(remoteAddress, reconnectTime, listenerFactory, negotiatorFactory, keys, localAddress, BigInteger.ONE);
}
@Override
public Future<PCEPSession> createClient(@Nonnull final InetSocketAddress remoteAddress, @Nonnull final long reconnectTime,
- @Nonnull final PCEPSessionListenerFactory listenerFactory, @Nonnull final PCEPSessionNegotiatorFactory negotiatorFactory,
- @Nonnull final KeyMapping keys, @Nullable final InetSocketAddress localAddress, @Nonnull final BigInteger dbVersion) {
+ @Nonnull final PCEPSessionListenerFactory listenerFactory, @Nonnull final PCEPSessionNegotiatorFactory negotiatorFactory,
+ @Nullable final KeyMapping keys, @Nonnull final InetSocketAddress localAddress, @Nonnull final BigInteger dbVersion) {
final Bootstrap b = new Bootstrap();
b.group(this.workerGroup);
b.localAddress(localAddress);
}
private void setChannelFactory(final Bootstrap bootstrap, final Optional<KeyMapping> keys) {
- if(Epoll.isAvailable()) {
+ if (Epoll.isAvailable()) {
bootstrap.channel(EpollSocketChannel.class);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} else {
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PCCReconnectPromise extends DefaultPromise<PCEPSession> {
-
+final class PCCReconnectPromise extends DefaultPromise<PCEPSession> {
private static final Logger LOG = LoggerFactory.getLogger(PCCReconnectPromise.class);
private final InetSocketAddress address;
@GuardedBy("this")
private Future<?> pending;
- public PCCReconnectPromise(final InetSocketAddress address, final int retryTimer,
- final int connectTimeout, final Bootstrap b) {
+ PCCReconnectPromise(final InetSocketAddress address, final int retryTimer,
+ final int connectTimeout, final Bootstrap b) {
+ super(GlobalEventExecutor.INSTANCE);
this.address = address;
this.retryTimer = retryTimer;
this.connectTimeout = connectTimeout;
this.b = b;
}
- public synchronized void connect() {
+ synchronized void connect() {
try {
this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout);
this.b.remoteAddress(this.address);
@SuppressWarnings("deprecation")
@Override
public synchronized Promise<PCEPSession> setSuccess(final PCEPSession result) {
+ final Promise<PCEPSession> promise = super.setSuccess(result);
LOG.debug("Promise {} completed", this);
- return super.setSuccess(result);
+ return promise;
}
- protected boolean isInitialConnectFinished() {
+ synchronized boolean isInitialConnectFinished() {
Preconditions.checkNotNull(this.pending);
return this.pending.isDone() && this.pending.isSuccess();
}
private final class BootstrapConnectListener implements ChannelFutureListener {
+ @GuardedBy("this")
private final Object lock;
- public BootstrapConnectListener(final Object lock) {
+ BootstrapConnectListener(final Object lock) {
this.lock = lock;
}
@Override
public void operationComplete(final ChannelFuture cf) throws Exception {
-
synchronized (this.lock) {
if (PCCReconnectPromise.this.isCancelled()) {
if (cf.isSuccess()) {
}
final EventLoop loop = cf.channel().eventLoop();
- loop.schedule(new Runnable() {
- @Override
- public void run() {
+ loop.schedule(() -> {
+ synchronized (PCCReconnectPromise.this) {
PCCReconnectPromise.LOG.debug("Attempting to connect to {}", PCCReconnectPromise.this.address);
final Future reconnectFuture = PCCReconnectPromise.this.b.connect();
- reconnectFuture.addListener(PCCReconnectPromise.BootstrapConnectListener.this);
+ reconnectFuture.addListener(BootstrapConnectListener.this);
PCCReconnectPromise.this.pending = reconnectFuture;
}
}, PCCReconnectPromise.this.retryTimer, TimeUnit.SECONDS);
public class PCCIncrementalSyncTest extends PCCMockCommon {
- private BigInteger lsp = BigInteger.valueOf(8);
+ private final BigInteger lsp = BigInteger.valueOf(8);
/**
* Test Incremental Synchronization
* Create 8 lsp, then it disconnects after 5 sec and then after 5 sec reconnects with Pcc DBVersion 10
* After reconnection PCE has DBVersion 10, therefore there is 9 changes missed. 9 Pcrt + 1 Pcrt-Sync
*/
- private final String[] mainInputIncrementalSync = new String[]{"--local-address", this.localAddress.getHostString(), "--remote-address",
- InetSocketAddressUtil.toHostAndPort(this.remoteAddress).toString(), "--pcc", "1", "--lsp", lsp.toString(), "--log-level", "DEBUG", "-ka", "40", "-d", "120",
- "--reconnect", "-1", "--redelegation-timeout", "0", "--state-timeout", "-1", "--incremental-sync-procedure", "10", "5", "5"};
+ private final String[] mainInputIncrementalSync = new String[]{"--local-address", this.localAddress.getHostString(),
+ "--remote-address", InetSocketAddressUtil.toHostAndPort(this.remoteAddress).toString(), "--pcc", "1", "--lsp",
+ lsp.toString(), "--log-level", "DEBUG", "-ka", "30", "-d", "120", "--reconnect", "-1", "--redelegation-timeout",
+ "0", "--state-timeout", "-1", "--incremental-sync-procedure", "10", "5", "5"};
@Test
public void testSessionIncrementalSyncEstablishment() throws UnknownHostException, InterruptedException, ExecutionException {
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import java.math.BigInteger;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.opendaylight.protocol.pcep.PCEPDispatcher;
import org.opendaylight.protocol.pcep.PCEPPeerProposal;
import org.opendaylight.protocol.pcep.PCEPSession;
-import org.opendaylight.protocol.pcep.PCEPSessionListener;
-import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
import org.opendaylight.protocol.pcep.ietf.stateful07.StatefulActivator;
import org.opendaylight.protocol.pcep.impl.BasePCEPSessionProposalFactory;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
public abstract class PCCMockCommon {
- private final static short KEEP_ALIVE = 40;
+ private final static short KEEP_ALIVE = 30;
private final static short DEAD_TIMER = 120;
+ private static final long SLEEP_FOR = 50;
protected final int port = InetSocketAddressUtil.getRandomPort();
protected final InetSocketAddress remoteAddress = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
protected final InetSocketAddress localAddress = new InetSocketAddress("127.0.0.1", port);
}
private static void checkNumberOfMessages(final int expectedNMessages, final TestingSessionListener listener) throws Exception {
- Stopwatch sw = Stopwatch.createStarted();
+ final Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
if (expectedNMessages != listener.messages().size()) {
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(SLEEP_FOR, TimeUnit.MILLISECONDS);
} else {
return;
}
}
static TestingSessionListener checkSessionListenerNotNull(final TestingSessionListenerFactory factory, final String localAddress) {
- Stopwatch sw = Stopwatch.createStarted();
- TestingSessionListener listener = null;
- while (sw.elapsed(TimeUnit.SECONDS) <= 1000) {
- listener = factory.getSessionListenerByRemoteAddress(InetAddresses.forString(localAddress));
+ final Stopwatch sw = Stopwatch.createStarted();
+ TestingSessionListener listener;
+ final InetAddress address = InetAddresses.forString(localAddress);
+ while (sw.elapsed(TimeUnit.SECONDS) <= 60) {
+ listener = factory.getSessionListenerByRemoteAddress(address);
if (listener == null) {
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(SLEEP_FOR, TimeUnit.MILLISECONDS);
} else {
return listener;
}
Thread.sleep(1000);
//Send Open with LspDBV = 1
final List<Message> messages = pceSessionListener.messages();
- int numberOfSyncMessage = 1;
+ final int numberOfSyncMessage = 1;
int numberOfLspExpected = numberOfLsp;
if (!expectedeInitialDb.equals(BigInteger.ZERO)) {
checkSequequenceDBVersionSync(messages, expectedeInitialDb);
assertNotNull(pceSessionListener.getSession());
assertTrue(pceSessionListener.isUp());
Thread.sleep(50);
- List<Message> messages;
+ final List<Message> messages;
if (startAtNumberLsp.isPresent()) {
messages = pceSessionListener.messages().subList(startAtNumberLsp.get(), startAtNumberLsp.get() + expectedNumberOfLsp);
} else {
}
protected static void checkSequequenceDBVersionSync(final List<Message> messages, final BigInteger expectedDbVersion) {
- for (Message msg : messages) {
+ for (final Message msg : messages) {
final List<Reports> pcrt = ((Pcrpt) msg).getPcrptMessage().getReports();
- for (Reports report : pcrt) {
+ for (final Reports report : pcrt) {
final Lsp lsp = report.getLsp();
if (lsp.getPlspId().getValue() == 0) {
assertEquals(false, lsp.isSync().booleanValue());
}
}
- protected Future<PCEPSession> createPCCSession(BigInteger DBVersion) {
+ protected Future<PCEPSession> createPCCSession(final BigInteger DBVersion) {
this.pccDispatcher = new PCCDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry());
final PCEPSessionNegotiatorFactory<PCEPSessionImpl> snf = getSessionNegotiatorFactory();
final PCCTunnelManager tunnelManager = new PCCTunnelManagerImpl(3, this.localAddress.getAddress(), 0, -1, new HashedWheelTimer(),
- Optional.<TimerHandler>absent());
+ Optional.absent());
return pccDispatcher.createClient(this.remoteAddress, -1,
- new PCEPSessionListenerFactory() {
- @Override
- public PCEPSessionListener getSessionListener() {
+ () -> {
pccSessionListener = new PCCSessionListener(1, tunnelManager, false);
return pccSessionListener;
- }
- }, snf, null, this.localAddress, DBVersion);
+ }, snf, null, this.localAddress, DBVersion);
}
private PCEPSessionNegotiatorFactory<PCEPSessionImpl> getSessionNegotiatorFactory() {