final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(listener);
final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf);
- final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(remoteAddress, retryTimer, clientBootStrap);
+ final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(remoteAddress, retryTimer, clientBootStrap, listener);
clientBootStrap.handler(BGPChannel.createClientChannelHandler(initializer, sessionPromise));
sessionPromise.connect();
LOG.debug("Client created.");
final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(peerRegistry);
final Bootstrap bootstrap = createClientBootStrap(keys, this.workerGroup);
final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, remoteAddress,
- retryTimer, bootstrap, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf));
+ retryTimer, bootstrap, peerRegistry, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf));
reconnectPromise.connect();
return reconnectPromise;
}
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistryListener;
+import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistrySessionListener;
import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
private final Map<IpAddress, BGPSessionPreferences> peerPreferences = Maps.newHashMap();
@GuardedBy("this")
private final Set<PeerRegistryListener> listeners = new HashSet<>();
+ @GuardedBy("this")
+ private final Set<PeerRegistrySessionListener> sessionListeners = new HashSet<>();
public static BGPPeerRegistry instance() {
return GLOBAL;
public synchronized void removePeerSession(final IpAddress ip) {
Preconditions.checkNotNull(ip);
this.sessionIds.remove(ip);
+ for (final PeerRegistrySessionListener peerRegistrySessionListener : this.sessionListeners) {
+ peerRegistrySessionListener.onSessionRemoved(ip);
+ }
}
@Override
// Map session id to peer IP address
this.sessionIds.put(ip, currentConnection);
+ for (final PeerRegistrySessionListener peerRegistrySessionListener : this.sessionListeners) {
+ peerRegistrySessionListener.onSessionCreated(ip);
+ }
return p;
}
}
};
}
-}
\ No newline at end of file
+
+ @Override
+ public synchronized AutoCloseable registerPeerSessionListener(final PeerRegistrySessionListener listener) {
+ this.sessionListeners.add(listener);
+ for (final IpAddress ipAddress : this.sessionIds.keySet()) {
+ listener.onSessionCreated(ipAddress);
+ }
+ return new AbstractRegistration() {
+ @Override
+ protected void removeRegistration() {
+ synchronized (StrictBGPPeerRegistry.this) {
+ StrictBGPPeerRegistry.this.sessionListeners.remove(listener);
+ }
+ }
+ };
+ }
+}
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.protocol.bgp.rib.impl.StrictBGPPeerRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistrySessionListener;
import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private InetSocketAddress address;
private final int retryTimer;
private final Bootstrap bootstrap;
+ private final BGPPeerRegistry peerRegistry;
+ private final AutoCloseable listenerRegistration;
@GuardedBy("this")
private ChannelFuture pending;
+ @GuardedBy("this")
+ private boolean peerSessionPresent;
+ @GuardedBy("this")
+ private boolean connectSkipped;
+
- public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap) {
+ public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap, BGPPeerRegistry peerRegistry) {
super(GlobalEventExecutor.INSTANCE);
this.address = Preconditions.checkNotNull(remoteAddress);
this.retryTimer = retryTimer;
this.bootstrap = Preconditions.checkNotNull(bootstrap);
+ this.peerRegistry = Preconditions.checkNotNull(peerRegistry);
+ this.listenerRegistration = this.peerRegistry
+ .registerPeerSessionListener(new BGPProtocolSessionPromise.PeerRegistrySessionListenerImpl(this,
+ StrictBGPPeerRegistry.getIpAddress(this.address)));
}
public synchronized void connect() {
- final BGPProtocolSessionPromise lock = this;
+ if (this.peerSessionPresent) {
+ LOG.debug("Connection to {} already exists", this.address);
+ this.connectSkipped = true;
+ return;
+ } else {
+ this.connectSkipped = false;
+ }
+ final BGPProtocolSessionPromise lock = this;
try {
LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(CONNECT_TIMEOUT));
if (this.address.isUnresolved()) {
loop.schedule(new Runnable() {
@Override
public void run() {
+ if (BGPProtocolSessionPromise.this.peerSessionPresent) {
+ LOG.debug("Connection to {} already exists", BGPProtocolSessionPromise.this.address);
+ BGPProtocolSessionPromise.this.connectSkipped = true;
+ return;
+ } else {
+ BGPProtocolSessionPromise.this.connectSkipped = false;
+ }
LOG.debug("Attempting to connect to {}", BGPProtocolSessionPromise.this.address);
final ChannelFuture reconnectFuture = BGPProtocolSessionPromise.this.bootstrap.connect();
reconnectFuture.addListener(new BootstrapConnectListener(lock));
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ closePeerSessionListener();
if (super.cancel(mayInterruptIfRunning)) {
+ Preconditions.checkNotNull(this.pending);
this.pending.cancel(mayInterruptIfRunning);
return true;
} else {
}
}
+ private void closePeerSessionListener() {
+ try {
+ this.listenerRegistration.close();
+ } catch (final Exception e) {
+ LOG.debug("Exception encountered while closing peer registry session listener registration", e);
+ }
+ }
+
@Override
public synchronized Promise<S> setSuccess(final S result) {
LOG.debug("Promise {} completed", this);
}
}
}
-}
\ No newline at end of file
+
+ private class PeerRegistrySessionListenerImpl implements PeerRegistrySessionListener {
+ private final Object lock;
+ private final IpAddress peerAddress;
+
+ PeerRegistrySessionListenerImpl(final Object lock, final IpAddress peerAddress) {
+ this.lock = lock;
+ this.peerAddress = peerAddress;
+ }
+
+ @Override
+ public void onSessionCreated(@Nonnull final IpAddress ip) {
+ if (!ip.equals(this.peerAddress)) {
+ return;
+ }
+ BGPProtocolSessionPromise.LOG.debug("Callback for session creation with peer {} received", ip);
+ synchronized (this.lock) {
+ BGPProtocolSessionPromise.this.peerSessionPresent = true;
+ }
+ }
+
+ @Override
+ public void onSessionRemoved(@Nonnull final IpAddress ip) {
+ if (!ip.equals(this.peerAddress)) {
+ return;
+ }
+ BGPProtocolSessionPromise.LOG.debug("Callback for session removal with peer {} received", ip);
+ synchronized (this.lock) {
+ BGPProtocolSessionPromise.this.peerSessionPresent = false;
+ if (BGPProtocolSessionPromise.this.connectSkipped) {
+ BGPProtocolSessionPromise.this.connect();
+ }
+ }
+ }
+ }
+
+}
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
+import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
import org.slf4j.Logger;
private final InetSocketAddress address;
private final int retryTimer;
private final Bootstrap bootstrap;
+ private final BGPPeerRegistry peerRegistry;
private final ChannelPipelineInitializer initializer;
private BGPProtocolSessionPromise<S> pending;
public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
final int retryTimer, final Bootstrap bootstrap,
+ final BGPPeerRegistry peerRegistry,
final ChannelPipelineInitializer initializer) {
super(executor);
this.bootstrap = bootstrap;
this.initializer = Preconditions.checkNotNull(initializer);
this.address = Preconditions.checkNotNull(address);
this.retryTimer = retryTimer;
+ this.peerRegistry = Preconditions.checkNotNull(peerRegistry);
}
public synchronized void connect() {
+ if (this.pending != null) {
+ this.pending.cancel(true);
+ }
+
// Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
- this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, new ChannelPipelineInitializer<S>() {
+ this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, new ChannelPipelineInitializer<S>() {
@Override
public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
BGPReconnectPromise.this.initializer.initializeChannel(channel, promise);
this.pending.addListener(new GenericFutureListener<Future<Object>>() {
@Override
public void operationComplete(final Future<Object> future) throws Exception {
- if (!future.isSuccess()) {
+ if (!future.isSuccess() && !BGPReconnectPromise.this.isDone()) {
BGPReconnectPromise.this.setFailure(future.cause());
}
}
}
public BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
- final ChannelPipelineInitializer initializer) {
- final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap);
+ final BGPPeerRegistry peerRegistry, final ChannelPipelineInitializer initializer) {
+ final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap, peerRegistry);
final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel channel) {
return this.pending.isDone() && this.pending.isSuccess();
}
+ private void reconnect() {
+ Preconditions.checkNotNull(this.pending);
+ this.pending.reconnect();
+ }
+
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
if (!this.promise.isInitialConnectFinished()) {
LOG.debug("Connection to {} was dropped during negotiation, reattempting", this.promise.address);
- this.promise.pending.reconnect();
+ this.promise.reconnect();
return;
}
*/
@Nonnull AutoCloseable registerPeerRegisterListener(@Nonnull PeerRegistryListener listener);
+ /**
+ * Register PeerRegistrySessionListener, which listens to the changes in sessions
+ * of peers in peer registry (create session, remove session). After registration,
+ * an initial drop is provided by calling onSessionCreated().
+ *
+ * @param listener The PeerRegistrySessionListener to be registered.
+ * @return Registration ticked, used for closing of registration.
+ */
+ @Nonnull AutoCloseable registerPeerSessionListener(PeerRegistrySessionListener listener);
+
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.bgp.rib.impl.spi;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+
+/**
+ * Listens to the session changes for peers in a PeerRegisty.
+ *
+ */
+public interface PeerRegistrySessionListener {
+
+ /**
+ * Invoked when new peer session is created.
+ * @param ip The peer's IP address.
+ */
+ void onSessionCreated(@Nonnull IpAddress ip);
+
+ /**
+ * Invoked when peer session is removed.
+ * @param ip The peer's IP address.
+ */
+ void onSessionRemoved(@Nonnull IpAddress ip);
+
+}
import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
import org.opendaylight.protocol.bgp.parser.BGPError;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
+import org.opendaylight.protocol.bgp.rib.impl.spi.PeerRegistrySessionListener;
import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
return mock;
}
+ private static PeerRegistrySessionListener getMockSessionListener() {
+ final PeerRegistrySessionListener mock = Mockito.mock(PeerRegistrySessionListener.class);
+ Mockito.doNothing().when(mock).onSessionCreated(Mockito.any(IpAddress.class));
+ Mockito.doNothing().when(mock).onSessionRemoved(Mockito.any(IpAddress.class));
+ return mock;
+ }
+
@Test
public void testIpAddressConstruction() throws Exception {
final InetSocketAddress adr = new InetSocketAddress("127.0.0.1", 179);
}
fail("Peer AS number mismatch");
}
+
+ @Test
+ public void testRegisterPeerSessionListener() throws Exception {
+ final PeerRegistrySessionListener sessionListener1 = getMockSessionListener();
+ this.peerRegistry.registerPeerSessionListener(sessionListener1);
+
+ final PeerRegistrySessionListener sessionListener2 = getMockSessionListener();
+ this.peerRegistry.registerPeerSessionListener(sessionListener2);
+
+ this.peerRegistry.addPeer(REMOTE_IP, this.peer1, this.mockPreferences);
+ this.peerRegistry.getPeer(REMOTE_IP, FROM, TO, this.classicOpen);
+ Mockito.verify(sessionListener1, Mockito.times(1)).onSessionCreated(REMOTE_IP);
+ Mockito.verify(sessionListener2, Mockito.times(1)).onSessionCreated(REMOTE_IP);
+
+ this.peerRegistry.removePeerSession(REMOTE_IP);
+ Mockito.verify(sessionListener1, Mockito.times(1)).onSessionRemoved(REMOTE_IP);
+ Mockito.verify(sessionListener2, Mockito.times(1)).onSessionRemoved(REMOTE_IP);
+ }
+
+ @Test
+ public void testClosePeerSessionOneListener() throws Exception {
+ final PeerRegistrySessionListener sessionListener1 = getMockSessionListener();
+ final AutoCloseable registration1 = this.peerRegistry.registerPeerSessionListener(sessionListener1);
+
+ final PeerRegistrySessionListener sessionListener2 = getMockSessionListener();
+ this.peerRegistry.registerPeerSessionListener(sessionListener2);
+
+ this.peerRegistry.addPeer(REMOTE_IP, this.peer1, this.mockPreferences);
+ this.peerRegistry.getPeer(REMOTE_IP, FROM, TO, this.classicOpen);
+ this.peerRegistry.removePeerSession(REMOTE_IP);
+
+ registration1.close();
+ this.peerRegistry.getPeer(REMOTE_IP, FROM, TO, this.classicOpen);
+ this.peerRegistry.removePeerSession(REMOTE_IP);
+
+ Mockito.verify(sessionListener1, Mockito.times(1)).onSessionCreated(REMOTE_IP);
+ Mockito.verify(sessionListener2, Mockito.times(2)).onSessionCreated(REMOTE_IP);
+ Mockito.verify(sessionListener1, Mockito.times(1)).onSessionRemoved(REMOTE_IP);
+ Mockito.verify(sessionListener2, Mockito.times(2)).onSessionRemoved(REMOTE_IP);
+ }
}