import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.protocol.bgp.rib.impl.StrictBGPPeerRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistrySessionListener;
import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private InetSocketAddress address;
private final int retryTimer;
private final Bootstrap bootstrap;
+ private final BGPPeerRegistry peerRegistry;
+ private final AutoCloseable listenerRegistration;
@GuardedBy("this")
private ChannelFuture pending;
+ @GuardedBy("this")
+ private boolean peerSessionPresent;
+ @GuardedBy("this")
+ private boolean connectSkipped;
+
- public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap) {
+ public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap, BGPPeerRegistry peerRegistry) {
super(GlobalEventExecutor.INSTANCE);
this.address = Preconditions.checkNotNull(remoteAddress);
this.retryTimer = retryTimer;
this.bootstrap = Preconditions.checkNotNull(bootstrap);
+ this.peerRegistry = Preconditions.checkNotNull(peerRegistry);
+ this.listenerRegistration = this.peerRegistry
+ .registerPeerSessionListener(new BGPProtocolSessionPromise.PeerRegistrySessionListenerImpl(this,
+ StrictBGPPeerRegistry.getIpAddress(this.address)));
}
public synchronized void connect() {
- final BGPProtocolSessionPromise lock = this;
+ if (this.peerSessionPresent) {
+ LOG.debug("Connection to {} already exists", this.address);
+ this.connectSkipped = true;
+ return;
+ } else {
+ this.connectSkipped = false;
+ }
+ final BGPProtocolSessionPromise lock = this;
try {
LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(CONNECT_TIMEOUT));
if (this.address.isUnresolved()) {
loop.schedule(new Runnable() {
@Override
public void run() {
+ if (BGPProtocolSessionPromise.this.peerSessionPresent) {
+ LOG.debug("Connection to {} already exists", BGPProtocolSessionPromise.this.address);
+ BGPProtocolSessionPromise.this.connectSkipped = true;
+ return;
+ } else {
+ BGPProtocolSessionPromise.this.connectSkipped = false;
+ }
LOG.debug("Attempting to connect to {}", BGPProtocolSessionPromise.this.address);
final ChannelFuture reconnectFuture = BGPProtocolSessionPromise.this.bootstrap.connect();
reconnectFuture.addListener(new BootstrapConnectListener(lock));
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ closePeerSessionListener();
if (super.cancel(mayInterruptIfRunning)) {
+ Preconditions.checkNotNull(this.pending);
this.pending.cancel(mayInterruptIfRunning);
return true;
} else {
}
}
+ private void closePeerSessionListener() {
+ try {
+ this.listenerRegistration.close();
+ } catch (final Exception e) {
+ LOG.debug("Exception encountered while closing peer registry session listener registration", e);
+ }
+ }
+
@Override
public synchronized Promise<S> setSuccess(final S result) {
LOG.debug("Promise {} completed", this);
}
}
}
-}
\ No newline at end of file
+
+ private class PeerRegistrySessionListenerImpl implements PeerRegistrySessionListener {
+ private final Object lock;
+ private final IpAddress peerAddress;
+
+ PeerRegistrySessionListenerImpl(final Object lock, final IpAddress peerAddress) {
+ this.lock = lock;
+ this.peerAddress = peerAddress;
+ }
+
+ @Override
+ public void onSessionCreated(@Nonnull final IpAddress ip) {
+ if (!ip.equals(this.peerAddress)) {
+ return;
+ }
+ BGPProtocolSessionPromise.LOG.debug("Callback for session creation with peer {} received", ip);
+ synchronized (this.lock) {
+ BGPProtocolSessionPromise.this.peerSessionPresent = true;
+ }
+ }
+
+ @Override
+ public void onSessionRemoved(@Nonnull final IpAddress ip) {
+ if (!ip.equals(this.peerAddress)) {
+ return;
+ }
+ BGPProtocolSessionPromise.LOG.debug("Callback for session removal with peer {} received", ip);
+ synchronized (this.lock) {
+ BGPProtocolSessionPromise.this.peerSessionPresent = false;
+ if (BGPProtocolSessionPromise.this.connectSkipped) {
+ BGPProtocolSessionPromise.this.connect();
+ }
+ }
+ }
+ }
+
+}