import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultPromise<S> {
+public final class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultPromise<S> {
private static final Logger LOG = LoggerFactory.getLogger(BGPProtocolSessionPromise.class);
private static final int CONNECT_TIMEOUT = 5000;
private final int retryTimer;
private final Bootstrap bootstrap;
private final BGPPeerRegistry peerRegistry;
+ @GuardedBy("this")
private final AutoCloseable listenerRegistration;
@GuardedBy("this")
private ChannelFuture pending;
private boolean connectSkipped;
- public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap, BGPPeerRegistry peerRegistry) {
+ public BGPProtocolSessionPromise(@Nonnull final InetSocketAddress remoteAddress, final int retryTimer,
+ @Nonnull final Bootstrap bootstrap, @Nonnull final BGPPeerRegistry peerRegistry) {
super(GlobalEventExecutor.INSTANCE);
this.address = Preconditions.checkNotNull(remoteAddress);
this.retryTimer = retryTimer;
this.bootstrap = Preconditions.checkNotNull(bootstrap);
this.peerRegistry = Preconditions.checkNotNull(peerRegistry);
- this.listenerRegistration = this.peerRegistry
- .registerPeerSessionListener(new BGPProtocolSessionPromise.PeerRegistrySessionListenerImpl(this,
- StrictBGPPeerRegistry.getIpAddress(this.address)));
+ this.listenerRegistration = this.peerRegistry.registerPeerSessionListener(
+ new BGPProtocolSessionPromise.PeerRegistrySessionListenerImpl(this,
+ StrictBGPPeerRegistry.getIpAddress(this.address)));
}
public synchronized void connect() {
final ChannelFuture connectFuture = this.bootstrap.connect();
connectFuture.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener(lock));
this.pending = connectFuture;
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.info("Failed to connect to {}", this.address, e);
this.setFailure(e);
}
final BGPProtocolSessionPromise lock = this;
final EventLoop loop = this.pending.channel().eventLoop();
- loop.schedule(new Runnable() {
- @Override
- public void run() {
+ loop.schedule(() -> {
+ synchronized (BGPProtocolSessionPromise.this) {
if (BGPProtocolSessionPromise.this.peerSessionPresent) {
LOG.debug("Connection to {} already exists", BGPProtocolSessionPromise.this.address);
BGPProtocolSessionPromise.this.connectSkipped = true;
}
}
- private void closePeerSessionListener() {
+ private synchronized void closePeerSessionListener() {
try {
this.listenerRegistration.close();
} catch (final Exception e) {
}
private class BootstrapConnectListener implements ChannelFutureListener {
+ @GuardedBy("this")
private final Object lock;
- public BootstrapConnectListener(final Object lock) {
+ BootstrapConnectListener(final Object lock) {
this.lock = lock;
}
}
private class PeerRegistrySessionListenerImpl implements PeerRegistrySessionListener {
+ @GuardedBy("this")
private final Object lock;
private final IpAddress peerAddress;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
+import javax.annotation.Nonnull;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
private final ChannelPipelineInitializer initializer;
private BGPProtocolSessionPromise<S> pending;
- public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
- final int retryTimer, final Bootstrap bootstrap,
- final BGPPeerRegistry peerRegistry,
- final ChannelPipelineInitializer initializer) {
+ public BGPReconnectPromise(@Nonnull final EventExecutor executor, @Nonnull final InetSocketAddress address,
+ final int retryTimer, @Nonnull final Bootstrap bootstrap, @Nonnull final BGPPeerRegistry peerRegistry,
+ @Nonnull final ChannelPipelineInitializer initializer) {
super(executor);
this.bootstrap = bootstrap;
this.initializer = Preconditions.checkNotNull(initializer);
}
// Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
- this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, new ChannelPipelineInitializer<S>() {
- @Override
- public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
- BGPReconnectPromise.this.initializer.initializeChannel(channel, promise);
- // add closed channel handler
- // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
- // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
- // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
- channel.pipeline().addLast(new ClosedChannelHandler(BGPReconnectPromise.this));
- }
+ this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, (channel, promise) -> {
+ this.initializer.initializeChannel(channel, promise);
+ // add closed channel handler
+ // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+ // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+ // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+ channel.pipeline().addLast(new ClosedChannelHandler(this));
});
- this.pending.addListener(new GenericFutureListener<Future<Object>>() {
- @Override
- public void operationComplete(final Future<Object> future) throws Exception {
- if (!future.isSuccess() && !BGPReconnectPromise.this.isDone()) {
- BGPReconnectPromise.this.setFailure(future.cause());
- }
+ this.pending.addListener(future -> {
+ if (!future.isSuccess() && !this.isDone()) {
+ this.setFailure(future.cause());
}
});
}
- public BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
+ private BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
final BGPPeerRegistry peerRegistry, final ChannelPipelineInitializer initializer) {
final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap, peerRegistry);
final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
/**
* @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
*/
- private boolean isInitialConnectFinished() {
+ private synchronized boolean isInitialConnectFinished() {
Preconditions.checkNotNull(this.pending);
return this.pending.isDone() && this.pending.isSuccess();
}
- private void reconnect() {
+ private synchronized void reconnect() {
Preconditions.checkNotNull(this.pending);
this.pending.reconnect();
}
private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
private final BGPReconnectPromise promise;
- public ClosedChannelHandler(final BGPReconnectPromise promise) {
+ ClosedChannelHandler(final BGPReconnectPromise promise) {
this.promise = promise;
}
import io.netty.util.concurrent.Promise;
import java.io.Closeable;
import java.net.InetSocketAddress;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.concepts.KeyMapping;
import org.opendaylight.protocol.pcep.PCEPDispatcher;
import org.opendaylight.protocol.pcep.PCEPPeerProposal;
private static final Integer SOCKET_BACKLOG_SIZE = 128;
private final PCEPSessionNegotiatorFactory snf;
private final PCEPHandlerFactory hf;
-
-
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final EventExecutor executor;
+ @GuardedBy("this")
private Optional<KeyMapping> keys;
/**
* @param bossGroup accepts an incoming connection
* @param workerGroup handles the traffic of accepted connection
*/
- public PCEPDispatcherImpl(final MessageRegistry registry,
- final PCEPSessionNegotiatorFactory negotiatorFactory,
- final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+ public PCEPDispatcherImpl(@Nonnull final MessageRegistry registry,
+ @Nonnull final PCEPSessionNegotiatorFactory negotiatorFactory,
+ @Nonnull final EventLoopGroup bossGroup, @Nonnull final EventLoopGroup workerGroup) {
this.snf = Preconditions.checkNotNull(negotiatorFactory);
this.hf = new PCEPHandlerFactory(registry);
if (Epoll.isAvailable()) {
}
@Override
- public synchronized ChannelFuture createServer(final InetSocketAddress address,
- final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
- return createServer(address, Optional.<KeyMapping>absent(), listenerFactory, peerProposal);
+ public final synchronized ChannelFuture createServer(final InetSocketAddress address,
+ final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
+ return createServer(address, Optional.absent(), listenerFactory, peerProposal);
}
@Override
- public synchronized ChannelFuture createServer(final InetSocketAddress address, final Optional<KeyMapping> keys,
- final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
+ public final synchronized ChannelFuture createServer(final InetSocketAddress address, final Optional<KeyMapping> keys,
+ final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
this.keys = keys;
final ChannelPipelineInitializer initializer = (ch, promise) -> {
- ch.pipeline().addLast(PCEPDispatcherImpl.this.hf.getDecoders());
- ch.pipeline().addLast("negotiator", PCEPDispatcherImpl.this.snf.getSessionNegotiator(listenerFactory, ch, promise, peerProposal));
- ch.pipeline().addLast(PCEPDispatcherImpl.this.hf.getEncoders());
+ ch.pipeline().addLast(this.hf.getDecoders());
+ ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(listenerFactory, ch, promise, peerProposal));
+ ch.pipeline().addLast(this.hf.getEncoders());
};
final ServerBootstrap b = createServerBootstrap(initializer);
return f;
}
- protected ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
+ synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
final ServerBootstrap b = new ServerBootstrap();
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
}
@Override
- public void close() {
+ public final void close() {
if (Epoll.isAvailable()) {
this.workerGroup.shutdownGracefully().awaitUninterruptibly();
this.bossGroup.shutdownGracefully().awaitUninterruptibly();
}
@Override
- public PCEPSessionNegotiatorFactory getPCEPSessionNegotiatorFactory() {
+ public final PCEPSessionNegotiatorFactory getPCEPSessionNegotiatorFactory() {
return this.snf;
}
}
import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
import org.opendaylight.protocol.pcep.PCEPSessionProposalFactory;
-import org.opendaylight.protocol.pcep.impl.PCEPDispatcherImpl.ChannelPipelineInitializer;
import org.opendaylight.protocol.pcep.spi.MessageRegistry;
import org.opendaylight.protocol.pcep.spi.pojo.ServiceLoaderPCEPExtensionProviderContext;
import org.opendaylight.protocol.util.InetSocketAddressUtil;
private PCEPDispatcherImpl dispatcher;
private PCEPDispatcherImpl disp2Spy;
- @Mock private Channel mockChannel;
+ @Mock
+ private Channel mockChannel;
private PCCMock pccMock;
public void setUp() {
MockitoAnnotations.initMocks(this);
final List<PCEPCapability> capList = new ArrayList<>();
- final PCEPSessionProposalFactory sessionProposal = new BasePCEPSessionProposalFactory(DEAD_TIMER, KEEP_ALIVE, capList);
+ final PCEPSessionProposalFactory sessionProposal = new BasePCEPSessionProposalFactory(DEAD_TIMER, KEEP_ALIVE,
+ capList);
final EventLoopGroup eventLoopGroup;
if (Epoll.isAvailable()) {
eventLoopGroup = new EpollEventLoopGroup();
eventLoopGroup = new NioEventLoopGroup();
}
final MessageRegistry msgReg = ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance()
- .getMessageHandlerRegistry();
+ .getMessageHandlerRegistry();
this.dispatcher = new PCEPDispatcherImpl(msgReg, new DefaultPCEPSessionNegotiatorFactory(sessionProposal, 0),
- eventLoopGroup, eventLoopGroup);
+ eventLoopGroup, eventLoopGroup);
Mockito.doReturn("mockChannel").when(this.mockChannel).toString();
final PCEPDispatcherImpl dispatcher2 = new PCEPDispatcherImpl(msgReg, new DefaultPCEPSessionNegotiatorFactory(sessionProposal, 0), eventLoopGroup, eventLoopGroup);
this.disp2Spy = Mockito.spy(dispatcher2);
this.pccMock = new PCCMock(new DefaultPCEPSessionNegotiatorFactory(sessionProposal, 0),
- new PCEPHandlerFactory(msgReg));
+ new PCEPHandlerFactory(msgReg));
}
@Test
final InetSocketAddress clientAddr1 = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
final InetSocketAddress clientAddr2 = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
final ChannelFuture futureChannel = this.dispatcher.createServer(serverAddr,
- () -> new SimpleSessionListener(), null);
+ SimpleSessionListener::new, null);
final PCEPSessionImpl session1 = (PCEPSessionImpl) this.pccMock.createClient(clientAddr1,
- RETRY_TIMER, CONNECT_TIMEOUT,
- () -> new SimpleSessionListener()).get();
+ RETRY_TIMER, CONNECT_TIMEOUT, SimpleSessionListener::new).get();
final PCEPSessionImpl session2 = (PCEPSessionImpl) this.pccMock.createClient(clientAddr2,
- RETRY_TIMER, CONNECT_TIMEOUT,
- () -> new SimpleSessionListener()).get();
+ RETRY_TIMER, CONNECT_TIMEOUT, SimpleSessionListener::new).get();
Assert.assertTrue(futureChannel.channel().isActive());
Assert.assertEquals(clientAddr1.getAddress().getHostAddress(), session1.getPeerPref().getIpAddress());
final int port = InetSocketAddressUtil.getRandomPort();
final InetSocketAddress serverAddr = new InetSocketAddress("0.0.0.0", port);
final InetSocketAddress clientAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
- this.dispatcher.createServer(serverAddr,
- () -> new SimpleSessionListener(), null);
+ this.dispatcher.createServer(serverAddr, SimpleSessionListener::new, null);
final PCEPSessionImpl session1 = (PCEPSessionImpl) this.pccMock.createClient(clientAddr,
- RETRY_TIMER, CONNECT_TIMEOUT,
- () -> new SimpleSessionListener()).get();
+ RETRY_TIMER, CONNECT_TIMEOUT, SimpleSessionListener::new).get();
try {
- this.pccMock.createClient(clientAddr,
- RETRY_TIMER, CONNECT_TIMEOUT,
- () -> new SimpleSessionListener()).get();
+ this.pccMock.createClient(clientAddr, RETRY_TIMER, CONNECT_TIMEOUT,
+ SimpleSessionListener::new).get();
Assert.fail();
} catch (final ExecutionException e) {
Assert.assertTrue(e.getMessage().contains("A conflicting session for address"));
final int port = InetSocketAddressUtil.getRandomPort();
final InetSocketAddress clientAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
this.dispatcher.createServer(new InetSocketAddress("0.0.0.0", port),
- () -> new SimpleSessionListener(), null);
+ SimpleSessionListener::new, null);
final PCEPSessionImpl session1 = (PCEPSessionImpl) this.pccMock.createClient(clientAddr,
- RETRY_TIMER, CONNECT_TIMEOUT,
- () -> new SimpleSessionListener()).get();
+ RETRY_TIMER, CONNECT_TIMEOUT, SimpleSessionListener::new).get();
Assert.assertEquals(clientAddr.getAddress(), session1.getRemoteAddress());
Assert.assertEquals(DEAD_TIMER, session1.getDeadTimerValue().shortValue());
session1.close();
final PCEPSessionImpl session2 = (PCEPSessionImpl) this.pccMock.createClient(clientAddr,
- RETRY_TIMER, CONNECT_TIMEOUT,
- () -> new SimpleSessionListener()).get();
+ RETRY_TIMER, CONNECT_TIMEOUT, SimpleSessionListener::new).get();
Assert.assertEquals(clientAddr.getAddress(), session1.getRemoteAddress());
Assert.assertEquals(DEAD_TIMER, session2.getDeadTimerValue().shortValue());
final int port = InetSocketAddressUtil.getRandomPort();
final InetSocketAddress clientAddr1 = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
final InetSocketAddress clientAddr2 = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
- final KeyMapping keys = KeyMapping.getKeyMapping(clientAddr1.getAddress(), new String("CLIENT1_ADDRESS"));
- keys.put(clientAddr2.getAddress(), new String("CLIENT2_ADDRESS").getBytes() );
+ final KeyMapping keys = KeyMapping.getKeyMapping(clientAddr1.getAddress(), "CLIENT1_ADDRESS");
+ keys.put(clientAddr2.getAddress(), "CLIENT2_ADDRESS".getBytes());
final ChannelFuture futureChannel = this.disp2Spy.createServer(new InetSocketAddress("0.0.0.0", port),
- () -> new SimpleSessionListener(), null);
+ SimpleSessionListener::new, null);
Mockito.verify(this.disp2Spy).createServerBootstrap(Mockito.any(PCEPDispatcherImpl.ChannelPipelineInitializer.class));
}
}
public Future<PCEPSession> createClient(final InetSocketAddress address, final int retryTimer,
- final int connectTimeout, final PCEPSessionListenerFactory listenerFactory) {
- return createClient(address, retryTimer, connectTimeout, (ChannelPipelineInitializer) (ch, promise) -> {
- ch.pipeline().addLast(PCCMock.this.factory.getDecoders());
- ch.pipeline().addLast("negotiator", PCCMock.this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise, null));
- ch.pipeline().addLast(PCCMock.this.factory.getEncoders());
+ final int connectTimeout, final PCEPSessionListenerFactory listenerFactory) {
+ return createClient(address, retryTimer, connectTimeout, (ch, promise) -> {
+ ch.pipeline().addLast(this.factory.getDecoders());
+ ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch,
+ promise, null));
+ ch.pipeline().addLast(this.factory.getEncoders());
});
}
Future<PCEPSession> createClient(final InetSocketAddress address, final int retryTimer, final int connectTimeout,
- final PCEPDispatcherImpl.ChannelPipelineInitializer initializer) {
+ final PCEPDispatcherImpl.ChannelPipelineInitializer initializer) {
final Bootstrap b = new Bootstrap();
- final PCEPProtocolSessionPromise p = new PCEPProtocolSessionPromise(this.executor, address, retryTimer, connectTimeout, b);
- (b.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(true))).handler(new ChannelInitializer<SocketChannel>() {
+ final PCEPProtocolSessionPromise p = new PCEPProtocolSessionPromise(this.executor, address, retryTimer,
+ connectTimeout, b);
+ (b.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) {
initializer.initializeChannel(ch, p);
InetAddress currentAddress = this.localAddress.getAddress();
this.pccDispatcher = new PCCDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance()
.getMessageHandlerRegistry());
- if(timerHandler.isPresent()) {
+ if (timerHandler.isPresent()) {
timerHandler.get().setPCCDispatcher(this.pccDispatcher);
}
for (int i = 0; i < this.pccCount; i++) {
return promise;
}
- private void setChannelFactory(final Bootstrap bootstrap, final Optional<KeyMapping> keys) {
+ private static void setChannelFactory(final Bootstrap bootstrap, final Optional<KeyMapping> keys) {
if (Epoll.isAvailable()) {
bootstrap.channel(EpollSocketChannel.class);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout);
this.b.remoteAddress(this.address);
final ChannelFuture cf = this.b.connect();
- cf.addListener(new BootstrapConnectListener(PCCReconnectPromise.this));
+ cf.addListener(new BootstrapConnectListener(this));
this.pending = cf;
} catch (final Exception e) {
LOG.info("Failed to connect to {}", this.address, e);
synchronized (PCCReconnectPromise.this) {
PCCReconnectPromise.LOG.debug("Attempting to connect to {}", PCCReconnectPromise.this.address);
final Future reconnectFuture = PCCReconnectPromise.this.b.connect();
- reconnectFuture.addListener(BootstrapConnectListener.this);
+ reconnectFuture.addListener(this);
PCCReconnectPromise.this.pending = reconnectFuture;
}
}, PCCReconnectPromise.this.retryTimer, TimeUnit.SECONDS);
final TestingSessionListener sl2 = checkSessionListenerNotNull(slf2, this.clientAddress.getAddress().getHostAddress());
Assert.assertNotNull(sl2.getSession());
Assert.assertTrue(sl2.isUp());
+ channel2.close();
}
}