import io.netty.util.concurrent.GlobalEventExecutor;
-import java.io.IOException;
-
import org.opendaylight.controller.config.api.JmxAttributeValidationException;
import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.protocol.bgp.rib.impl.BGPPeer;
import org.opendaylight.protocol.bgp.rib.impl.RIBImpl;
import org.opendaylight.protocol.concepts.ListenerRegistration;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.protocol.framework.TimedReconnectStrategy;
import org.osgi.framework.BundleContext;
BGP bgp = getBgpDependency();
final BGPPeer peer = new BGPPeer(rib, "peer-" + bgp.toString());
- try {
- final long reconnects = getReconnectAttempts();
- ListenerRegistration<BGPSessionListener> reg = bgp
- .registerUpdateListener(peer, new TimedReconnectStrategy(
- GlobalEventExecutor.INSTANCE,
- getConnectionTimeout(), 5000, 1.0, null,
- reconnects, null));
- return new RibImplCloseable(reg, rib);
- } catch (IOException e) {
- throw new RuntimeException("Failed to register with BGP", e);
- }
+ final long reconnects = getReconnectAttempts();
+ ListenerRegistration<BGPSessionListener> reg = bgp
+ .registerUpdateListener(peer,
+ new ReconnectStrategyFactory() {
+ @Override
+ public ReconnectStrategy createReconnectStrategy() {
+ return new TimedReconnectStrategy(
+ GlobalEventExecutor.INSTANCE,
+ getConnectionTimeout(), 5000, 1.0, null,
+ reconnects, null);
+ }
+ }, new TimedReconnectStrategy(
+ GlobalEventExecutor.INSTANCE,
+ getConnectionTimeout(), 5000, 1.0, null,
+ reconnects, null));
+ return new RibImplCloseable(reg, rib);
}
private static final class RibImplCloseable implements AutoCloseable {
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
import org.opendaylight.protocol.concepts.ListenerRegistration;
import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
/**
* BGP interface. At this time it only supports listening to changes seen by the backing device, typically a network
* needs to be explicitly closed in order to stop receiving the updates.
*
* @param listener {@link BGPSessionListener}
- * @param strategy {@link ReconnectStrategy} to use for TCP-level retries
+ * @param tcpStrategyFactory {@link ReconnectStrategyFactory} to use for creating TCP-level retry strategies
+ * @param sessionStrategy {@link ReconnectStrategy} to use for session-level retries
* @throws IllegalStateException if there is already a listener registered
* @throws IOException if some IO error occurred
* @return ListenerRegistration
*/
- public ListenerRegistration<BGPSessionListener> registerUpdateListener(BGPSessionListener listener, ReconnectStrategy strategy) throws IOException;
+ public ListenerRegistration<BGPSessionListener> registerUpdateListener(
+ BGPSessionListener listener, ReconnectStrategyFactory tcpStrategyFactory, ReconnectStrategy sessionStrategy);
}
import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
+
+import java.net.InetSocketAddress;
+
import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
import org.opendaylight.protocol.framework.AbstractDispatcher;
import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.protocol.framework.SessionListenerFactory;
-import java.net.InetSocketAddress;
-
/**
* Implementation of BGPDispatcher.
*/
private final BGPHandlerFactory hf;
- public BGPDispatcherImpl(final BGPMessageFactory parser, EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+ public BGPDispatcherImpl(final BGPMessageFactory parser, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
super(bossGroup, workerGroup);
this.hf = new BGPHandlerFactory(parser);
}
});
}
+ @Override
+ public Future<Void> createReconnectingClient(final InetSocketAddress address,
+ final BGPSessionPreferences preferences, final BGPSessionListener listener,
+ final ReconnectStrategyFactory connectStrategyFactory,
+ final ReconnectStrategy reestablishStrategy) {
+ final BGPSessionNegotiatorFactory snf = new BGPSessionNegotiatorFactory(this.timer, preferences);
+ final SessionListenerFactory<BGPSessionListener> slf = new SessionListenerFactory<BGPSessionListener>() {
+ @Override
+ public BGPSessionListener getSessionListener() {
+ return listener;
+ }
+ };
+
+ return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy, new PipelineInitializer<BGPSessionImpl>() {
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
+ ch.pipeline().addLast(hf.getDecoders());
+ ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(slf, ch, promise));
+ ch.pipeline().addLast(hf.getEncoders());
+ }
+ });
+ }
+
@Override
public void close() {
}
*/
package org.opendaylight.protocol.bgp.rib.impl;
+import io.netty.util.concurrent.Future;
+
import java.io.Closeable;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutionException;
-import org.opendaylight.protocol.bgp.parser.BGPSession;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionProposal;
import org.opendaylight.protocol.concepts.ListenerRegistration;
import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import com.google.common.base.Preconditions;
* Implementation of {@link BGP}.
*/
public class BGPImpl implements BGP, Closeable {
- /**
- * Wrapper class to give listener a close method.
- */
- public class BGPListenerRegistration extends ListenerRegistration<BGPSessionListener> {
- private final BGPSession session;
-
- public BGPListenerRegistration(final BGPSessionListener l, final BGPSession session) {
- super(l);
- this.session = session;
- }
-
- @Override
- public void removeRegistration() {
- this.session.close();
- }
- }
-
private final BGPDispatcher dispatcher;
private final InetSocketAddress address;
this.proposal = Preconditions.checkNotNull(proposal);
}
- /**
- * {@inheritDoc}
- */
@Override
- public BGPListenerRegistration registerUpdateListener(final BGPSessionListener listener, final ReconnectStrategy strategy) throws IOException {
- final BGPSession session;
- try {
- session = this.dispatcher.createClient(this.address, this.proposal.getProposal(), listener, strategy).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException("Failed to connect to peer", e);
- }
- return new BGPListenerRegistration(listener, session);
+ public ListenerRegistration<BGPSessionListener> registerUpdateListener(final BGPSessionListener listener, final ReconnectStrategyFactory tcpStrategyFactory, final ReconnectStrategy sessionStrategy) {
+ final Future<Void> s = this.dispatcher.createReconnectingClient(address, this.proposal.getProposal(), listener, tcpStrategyFactory, sessionStrategy);
+ return new ListenerRegistration<BGPSessionListener>(listener) {
+ @Override
+ protected void removeRegistration() {
+ s.cancel(true);
+ }
+ };
}
@Override
}
}
- @Override
- public void onSessionDown(final BGPSession session, final Exception e) {
+ private void cleanup() {
// FIXME: support graceful restart
for (final TablesKey key : this.tables) {
this.rib.clearTable(this, key);
this.tables.clear();
}
+ @Override
+ public void onSessionDown(final BGPSession session, final Exception e) {
+ logger.info("Session with peer {} went down", this.name, e);
+ cleanup();
+ }
+
@Override
public void onSessionTerminated(final BGPSession session, final BGPTerminationReason cause) {
logger.info("Session with peer {} terminated: {}", this.name, cause);
+ cleanup();
}
@Override
- public String toString() {
+ public final String toString() {
return addToStringAttributes(Objects.toStringHelper(this)).toString();
}
import org.opendaylight.protocol.bgp.parser.BGPSession;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
/**
* Dispatcher class for creating BGP clients.
*/
Future<? extends BGPSession> createClient(InetSocketAddress address, BGPSessionPreferences preferences, BGPSessionListener listener,
final ReconnectStrategy strategy);
+
+ Future<Void> createReconnectingClient(InetSocketAddress address, BGPSessionPreferences preferences, BGPSessionListener listener,
+ ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy);
}
import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
import org.opendaylight.protocol.bgp.parser.BGPSession;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
-import org.opendaylight.protocol.bgp.rib.impl.BGPImpl.BGPListenerRegistration;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionProposal;
+import org.opendaylight.protocol.concepts.ListenerRegistration;
import org.opendaylight.protocol.framework.NeverReconnectStrategy;
import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130918.open.BgpParameters;
public class BGPImplTest {
private BGPMessageFactory parser;
@Mock
- private Future<BGPSession> future;
+ private Future<Void> future;
private BGPImpl bgp;
MockitoAnnotations.initMocks(this);
doReturn("").when(this.parser).toString();
- doReturn(null).when(this.future).get();
- doReturn(this.future).when(this.disp).createClient(any(InetSocketAddress.class), any(BGPSessionPreferences.class),
- any(BGPSessionListener.class), any(ReconnectStrategy.class));
+ doReturn(this.future).when(this.disp).createReconnectingClient(any(InetSocketAddress.class), any(BGPSessionPreferences.class),
+ any(BGPSessionListener.class), any(ReconnectStrategyFactory.class), any(ReconnectStrategy.class));
}
@Test
public void testBgpImpl() throws Exception {
doReturn(new BGPSessionPreferences(0, 0, null, Collections.<BgpParameters> emptyList())).when(this.prop).getProposal();
this.bgp = new BGPImpl(this.disp, new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), this.prop);
- final BGPListenerRegistration reg = this.bgp.registerUpdateListener(new SimpleSessionListener(),
- new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
+ final ListenerRegistration<?> reg = this.bgp.registerUpdateListener(new SimpleSessionListener(),
+ new ReconnectStrategyFactory() {
+ @Override
+ public ReconnectStrategy createReconnectStrategy() {
+ return new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000);
+ }
+ }, new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
assertEquals(SimpleSessionListener.class, reg.getListener().getClass());
}
import org.opendaylight.protocol.framework.DocumentedException;
import org.opendaylight.protocol.framework.ProtocolMessageFactory;
import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.protocol.util.ByteArray;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130918.NotifyBuilder;
import org.opendaylight.yangtools.yang.binding.Notification;
return messages;
}
- /**
- * @param listener BGPListener
- * @return ListenerRegistration
- */
- @Override
- public synchronized ListenerRegistration<BGPSessionListener> registerUpdateListener(final BGPSessionListener listener,
- final ReconnectStrategy strategy) {
- return EventBusRegistration.createAndRegister(this.eventBus, listener, this.allPreviousBGPMessages);
- }
-
public synchronized void insertConnectionLostEvent() {
this.insertMessage(CONNECTION_LOST_MAGIC_MSG);
}
public EventBus getEventBus() {
return this.eventBus;
}
+
+ @Override
+ public ListenerRegistration<BGPSessionListener> registerUpdateListener(
+ final BGPSessionListener listener,
+ final ReconnectStrategyFactory tcpStrategyFactory,
+ final ReconnectStrategy sessionStrategy) {
+ return EventBusRegistration.createAndRegister(this.eventBus, listener, this.allPreviousBGPMessages);
+ }
}