import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
+import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
import org.slf4j.Logger;
private final InetSocketAddress address;
private final int retryTimer;
private final Bootstrap bootstrap;
+ private final BGPPeerRegistry peerRegistry;
private final ChannelPipelineInitializer initializer;
private BGPProtocolSessionPromise<S> pending;
public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
final int retryTimer, final Bootstrap bootstrap,
+ final BGPPeerRegistry peerRegistry,
final ChannelPipelineInitializer initializer) {
super(executor);
this.bootstrap = bootstrap;
this.initializer = Preconditions.checkNotNull(initializer);
this.address = Preconditions.checkNotNull(address);
this.retryTimer = retryTimer;
+ this.peerRegistry = Preconditions.checkNotNull(peerRegistry);
}
public synchronized void connect() {
+ if (this.pending != null) {
+ this.pending.cancel(true);
+ }
+
// Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
- this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, new ChannelPipelineInitializer<S>() {
+ this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, new ChannelPipelineInitializer<S>() {
@Override
public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
BGPReconnectPromise.this.initializer.initializeChannel(channel, promise);
this.pending.addListener(new GenericFutureListener<Future<Object>>() {
@Override
public void operationComplete(final Future<Object> future) throws Exception {
- if (!future.isSuccess()) {
+ if (!future.isSuccess() && !BGPReconnectPromise.this.isDone()) {
BGPReconnectPromise.this.setFailure(future.cause());
}
}
}
public BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
- final ChannelPipelineInitializer initializer) {
- final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap);
+ final BGPPeerRegistry peerRegistry, final ChannelPipelineInitializer initializer) {
+ final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap, peerRegistry);
final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel channel) {
return this.pending.isDone() && this.pending.isSuccess();
}
+ private void reconnect() {
+ Preconditions.checkNotNull(this.pending);
+ this.pending.reconnect();
+ }
+
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
if (!this.promise.isInitialConnectFinished()) {
LOG.debug("Connection to {} was dropped during negotiation, reattempting", this.promise.address);
- this.promise.pending.reconnect();
+ this.promise.reconnect();
return;
}