import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
+import javax.annotation.Nonnull;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
private final ChannelPipelineInitializer initializer;
private BGPProtocolSessionPromise<S> pending;
- public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
- final int retryTimer, final Bootstrap bootstrap,
- final BGPPeerRegistry peerRegistry,
- final ChannelPipelineInitializer initializer) {
+ public BGPReconnectPromise(@Nonnull final EventExecutor executor, @Nonnull final InetSocketAddress address,
+ final int retryTimer, @Nonnull final Bootstrap bootstrap, @Nonnull final BGPPeerRegistry peerRegistry,
+ @Nonnull final ChannelPipelineInitializer initializer) {
super(executor);
this.bootstrap = bootstrap;
this.initializer = Preconditions.checkNotNull(initializer);
}
// Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
- this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, new ChannelPipelineInitializer<S>() {
- @Override
- public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
- BGPReconnectPromise.this.initializer.initializeChannel(channel, promise);
- // add closed channel handler
- // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
- // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
- // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
- channel.pipeline().addLast(new ClosedChannelHandler(BGPReconnectPromise.this));
- }
+ this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, (channel, promise) -> {
+ this.initializer.initializeChannel(channel, promise);
+ // add closed channel handler
+ // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+ // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+ // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+ channel.pipeline().addLast(new ClosedChannelHandler(this));
});
- this.pending.addListener(new GenericFutureListener<Future<Object>>() {
- @Override
- public void operationComplete(final Future<Object> future) throws Exception {
- if (!future.isSuccess() && !BGPReconnectPromise.this.isDone()) {
- BGPReconnectPromise.this.setFailure(future.cause());
- }
+ this.pending.addListener(future -> {
+ if (!future.isSuccess() && !this.isDone()) {
+ this.setFailure(future.cause());
}
});
}
- public BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
+ private BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
final BGPPeerRegistry peerRegistry, final ChannelPipelineInitializer initializer) {
final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap, peerRegistry);
final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
/**
* @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
*/
- private boolean isInitialConnectFinished() {
+ private synchronized boolean isInitialConnectFinished() {
Preconditions.checkNotNull(this.pending);
return this.pending.isDone() && this.pending.isSuccess();
}
- private void reconnect() {
+ private synchronized void reconnect() {
Preconditions.checkNotNull(this.pending);
this.pending.reconnect();
}
private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
private final BGPReconnectPromise promise;
- public ClosedChannelHandler(final BGPReconnectPromise promise) {
+ ClosedChannelHandler(final BGPReconnectPromise promise) {
this.promise = promise;
}