*/
package org.opendaylight.protocol.bgp.parser;
+import java.io.Serializable;
+
/**
* Caret for combination of Error-type and Error-value
*/
-final class BGPErrorIdentifier {
+final class BGPErrorIdentifier implements Serializable {
+ private static final long serialVersionUID = 5722575354944165734L;
private final short code;
private final short subcode;
}
final RIB rib = getRibDependency();
- return rib.getDispatcher().createReconnectingClient(address, remoteAs, registry, rib.getTcpStrategyFactory(),
- rib.getSessionStrategyFactory(), keys);
+ return rib.getDispatcher().createReconnectingClient(address, remoteAs, registry, rib.getTcpStrategyFactory(), keys);
}
private BGPPeerRegistry getPeerRegistryBackwards() {
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator;
import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
+import org.opendaylight.protocol.bgp.rib.spi.SessionNegotiator;
import org.opendaylight.protocol.util.Values;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
* Bgp Session negotiator. Common for local-to-remote and remote-to-local connections.
* One difference is session validation performed by injected BGPSessionValidator when OPEN message is received.
*/
-public abstract class AbstractBGPSessionNegotiator extends AbstractSessionNegotiator<Notification, BGPSessionImpl> {
+public abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandlerAdapter implements SessionNegotiator {
// 4 minutes recommended in http://tools.ietf.org/html/rfc4271#section-8.2.2
- protected static final int INITIAL_HOLDTIMER = 4;
+ private static final int INITIAL_HOLDTIMER = 4;
/**
* @see <a href="http://tools.ietf.org/html/rfc6793">BGP Support for 4-Octet AS Number Space</a>
private static final Logger LOG = LoggerFactory.getLogger(AbstractBGPSessionNegotiator.class);
private final BGPPeerRegistry registry;
private final BGPSessionValidator sessionValidator;
-
+ private final Promise<BGPSessionImpl> promise;
+ private final Channel channel;
@GuardedBy("this")
private State state = State.IDLE;
public AbstractBGPSessionNegotiator(final Promise<BGPSessionImpl> promise, final Channel channel,
final BGPPeerRegistry registry, final BGPSessionValidator sessionValidator) {
- super(promise, channel);
+ this.promise = Preconditions.checkNotNull(promise);
+ this.channel = Preconditions.checkNotNull(channel);
this.registry = registry;
this.sessionValidator = sessionValidator;
}
- @Override
- protected synchronized void startNegotiation() {
+ private synchronized void startNegotiation() {
Preconditions.checkState(this.state == State.IDLE);
// Check if peer is configured in registry before retrieving preferences
return StrictBGPPeerRegistry.getIpAddress(this.channel.remoteAddress());
}
- @Override
protected synchronized void handleMessage(final Notification msg) {
LOG.debug("Channel {} handling message in state {}", this.channel, this.state);
}
}
- @Override
- protected void negotiationFailed(final Throwable e) {
+ private void negotiationFailed(final Throwable e) {
LOG.warn("Channel {} negotiation failed: {}", this.channel, e.getMessage());
if (e instanceof BGPDocumentedException) {
// although sendMessage() can also result in calling this method, it won't create a cycle. In case sendMessage() fails to
this.sendMessage(buildErrorNotify(((BGPDocumentedException)e).getError(), ((BGPDocumentedException) e).getData()));
}
this.registry.removePeerSession(getRemoteIp());
- super.negotiationFailed(e);
+ negotiationFailedCloseChannel(e);
this.state = State.FINISHED;
}
public synchronized State getState() {
return this.state;
}
+
+ private final void negotiationSuccessful(BGPSessionImpl session) {
+ LOG.debug("Negotiation on channel {} successful with session {}", this.channel, session);
+ channel.pipeline().replace(this, "session", session);
+ promise.setSuccess(session);
+ }
+
+ private void negotiationFailedCloseChannel(Throwable cause) {
+ LOG.debug("Negotiation on channel {} failed", this.channel, cause);
+ channel.close();
+ promise.setFailure(cause);
+ }
+
+ private final void sendMessage(final Notification msg) {
+ channel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f) {
+ if (!f.isSuccess()) {
+ LOG.info("Failed to send message {}", msg, f.cause());
+ negotiationFailedCloseChannel(f.cause());
+ } else {
+ LOG.trace("Message {} sent to socket", msg);
+ }
+
+ }
+ });
+ }
+
+ @Override
+ public final void channelActive(ChannelHandlerContext ctx) {
+ LOG.debug("Starting session negotiation on channel {}", this.channel);
+
+ try {
+ this.startNegotiation();
+ } catch (final Exception e) {
+ LOG.warn("Unexpected negotiation failure", e);
+ negotiationFailedCloseChannel(e);
+ }
+
+ }
+
+ @Override
+ public final void channelRead(ChannelHandlerContext ctx, Object msg) {
+ LOG.debug("Negotiation read invoked on channel {}", this.channel);
+
+ try {
+ handleMessage((Notification) msg);
+ } catch (Exception e) {
+ LOG.debug("Unexpected error while handling negotiation message {}", msg, e);
+ negotiationFailedCloseChannel(e);
+ }
+
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.info("Unexpected error during negotiation", cause);
+ negotiationFailedCloseChannel(cause);
+ }
}
import io.netty.channel.Channel;
import io.netty.util.concurrent.Promise;
-
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
-import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.SessionListenerFactory;
-import org.opendaylight.protocol.framework.SessionNegotiator;
-import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
+import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
+import org.opendaylight.protocol.bgp.rib.spi.SessionNegotiator;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
-import org.opendaylight.yangtools.yang.binding.Notification;
-public final class BGPClientSessionNegotiatorFactory implements SessionNegotiatorFactory<Notification, BGPSessionImpl, BGPSessionListener> {
+public final class BGPClientSessionNegotiatorFactory implements BGPSessionNegotiatorFactory<BGPSessionImpl> {
private final BGPClientSessionValidator validator;
private final BGPPeerRegistry peerRegistry;
}
@Override
- public SessionNegotiator<BGPSessionImpl> getSessionNegotiator(final SessionListenerFactory<BGPSessionListener> factory,
- final Channel channel, final Promise<BGPSessionImpl> promise) {
+ public SessionNegotiator getSessionNegotiator(final Channel channel, final Promise<BGPSessionImpl> promise) {
return new BGPClientSessionNegotiator(promise, channel, peerRegistry, validator);
}
}
*/
package org.opendaylight.protocol.bgp.rib.impl;
+import com.google.common.base.Preconditions;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise;
+import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator;
-import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.AbstractDispatcher;
+import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
import org.opendaylight.protocol.framework.ReconnectStrategy;
import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.tcpmd5.api.KeyMapping;
import org.opendaylight.tcpmd5.netty.MD5ChannelOption;
import org.opendaylight.tcpmd5.netty.MD5ServerChannelFactory;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implementation of BGPDispatcher.
*/
-public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl, BGPSessionListener> implements BGPDispatcher, AutoCloseable {
+public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
private final MD5ServerChannelFactory<?> scf;
private final MD5ChannelFactory<?> cf;
private final BGPHandlerFactory hf;
+ private final EventLoopGroup bossGroup;
+ private final EventLoopGroup workerGroup;
+ private final EventExecutor executor;
private KeyMapping keys;
- private static final String NEGOTIATOR = "negotiator";
public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
this(messageRegistry, bossGroup, workerGroup, null, null);
}
public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final MD5ChannelFactory<?> cf, final MD5ServerChannelFactory<?> scf) {
- super(bossGroup, workerGroup);
+ this.bossGroup = Preconditions.checkNotNull(bossGroup);
+ this.workerGroup = Preconditions.checkNotNull(workerGroup);
+ this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE);
this.hf = new BGPHandlerFactory(messageRegistry);
this.cf = cf;
this.scf = scf;
+ this.keys = null;
}
@Override
public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress address,
- final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy) {
+ final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy) {
final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, listener);
- return super.createClient(address, strategy, new PipelineInitializer<BGPSessionImpl>() {
- @Override
- public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
- ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders());
- ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(null, ch, promise));
- ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders());
- }
- });
- }
-
- @Override
- public Future<Void> createReconnectingClient(final InetSocketAddress address,
- final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategyFactory connectStrategyFactory,
- final ReconnectStrategyFactory reestablishStrategyFactory) {
- return this.createReconnectingClient(address, remoteAs, listener, connectStrategyFactory, reestablishStrategyFactory,
- null);
+ final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer
+ (BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders());
+
+ final Bootstrap b = new Bootstrap();
+ final BGPProtocolSessionPromise p = new BGPProtocolSessionPromise(this.executor, address, strategy, b);
+ b.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(true));
+ b.handler(BGPChannel.createChannelInitializer(initializer, p));
+ this.customizeBootstrap(b);
+ this.setWorkerGroup(b);
+ p.connect();
+ LOG.debug("Client created.");
+ return p;
}
@Override
public void close() {
+ try {
+ this.workerGroup.shutdownGracefully();
+ } finally {
+ this.bossGroup.shutdownGracefully();
+ }
}
@Override
public synchronized Future<Void> createReconnectingClient(final InetSocketAddress address,
- final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory connectStrategyFactory,
- final ReconnectStrategyFactory reestablishStrategyFactory, final KeyMapping keys) {
+ final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory connectStrategyFactory,
+ final KeyMapping keys) {
final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, peerRegistry);
-
this.keys = keys;
- final Future<Void> ret = super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategyFactory.createReconnectStrategy(), new PipelineInitializer<BGPSessionImpl>() {
- @Override
- public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
- ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders());
- ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(null, ch, promise));
- ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders());
- }
- });
+
+ final Bootstrap b = new Bootstrap();
+ final BGPReconnectPromise p = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, address,
+ connectStrategyFactory, b, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders()));
+ b.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(true));
+ this.customizeBootstrap(b);
+ this.setWorkerGroup(b);
+ p.connect();
+
this.keys = null;
- return ret;
+ return p;
}
@Override
public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address, final BGPSessionValidator sessionValidator) {
- return this.createServer(registry, address, sessionValidator, null);
- }
-
- @Override
- public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address, final BGPSessionValidator sessionValidator, final KeyMapping keys) {
final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(sessionValidator, registry);
-
- this.keys = keys;
- final ChannelFuture ret = super.createServer(address, new PipelineInitializer<BGPSessionImpl>() {
- @Override
- public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
- ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders());
- ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(null, ch, promise));
- ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders());
- }
- });
- this.keys = null;
-
- return ret;
+ final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer
+ (BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders());
+ final ServerBootstrap b = new ServerBootstrap();
+ b.childHandler(BGPChannel.createChannelInitializer(initializer, new DefaultPromise(BGPDispatcherImpl.this.executor)));
+ b.option(ChannelOption.SO_BACKLOG, Integer.valueOf(128));
+ b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ this.customizeBootstrap(b);
+
+ final ChannelFuture f = b.bind(address);
+ LOG.debug("Initiated server {} at {}.", f, address);
+ return f;
}
- @Override
protected void customizeBootstrap(final Bootstrap b) {
if (this.keys != null && !this.keys.isEmpty()) {
if (this.cf == null) {
b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
}
- @Override
- protected void customizeBootstrap(final ServerBootstrap b) {
+ private void customizeBootstrap(final ServerBootstrap b) {
if (this.keys != null && !this.keys.isEmpty()) {
if (this.scf == null) {
throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
}
// Make sure we are doing round-robin processing
- b.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+ b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+
+ if (b.group() == null) {
+ b.group(this.bossGroup, this.workerGroup);
+ }
+
+ try {
+ b.channel(NioServerSocketChannel.class);
+ } catch (IllegalStateException e) {
+ LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+ }
+ }
+
+ private void setWorkerGroup(final Bootstrap b) {
+ if (b.group() == null) {
+ b.group(this.workerGroup);
+ }
+ try {
+ b.channel(NioSocketChannel.class);
+ } catch (IllegalStateException e) {
+ LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+ }
+ }
+
+ public interface ChannelPipelineInitializer {
+ void initializeChannel(SocketChannel socketChannel, Promise<BGPSessionImpl> promise);
}
+ public static class BGPChannel {
+ private static final String NEGOTIATOR = "negotiator";
+
+ private BGPChannel() {
+
+ }
+
+ public static <T extends BGPSessionNegotiatorFactory> ChannelPipelineInitializer createChannelPipelineInitializer(final ChannelHandler[] channelDecoder,
+ final T snf,
+ final ChannelHandler[] channelEncoder) {
+ return new ChannelPipelineInitializer() {
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
+ ch.pipeline().addLast(channelDecoder);
+ ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(ch, promise));
+ ch.pipeline().addLast(channelEncoder);
+ }
+ };
+ }
+
+ public static ChannelHandler createChannelInitializer(final ChannelPipelineInitializer initializer, final Promise<BGPSessionImpl> promise) {
+ return new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ initializer.initializeChannel(ch, promise);
+ }
+ };
+ }
+ }
}
+
+
this.runtimeReg = null;
}
if (this.session != null) {
- this.session.close();
+ try {
+ this.session.close();
+ } catch (final Exception e) {
+ LOG.warn("Error closing session with peer", e);
+ }
this.session = null;
}
}
package org.opendaylight.protocol.bgp.rib.impl;
import com.google.common.base.Preconditions;
-
import io.netty.channel.Channel;
import io.netty.util.concurrent.Promise;
-
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator;
-import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.SessionListenerFactory;
-import org.opendaylight.protocol.framework.SessionNegotiator;
-import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
-import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
+import org.opendaylight.protocol.bgp.rib.spi.SessionNegotiator;
-public final class BGPServerSessionNegotiatorFactory implements SessionNegotiatorFactory<Notification, BGPSessionImpl, BGPSessionListener> {
+public final class BGPServerSessionNegotiatorFactory implements BGPSessionNegotiatorFactory<BGPSessionImpl> {
private final BGPSessionValidator validator;
private final BGPPeerRegistry registry;
}
@Override
- public SessionNegotiator<BGPSessionImpl> getSessionNegotiator(final SessionListenerFactory<BGPSessionListener> factory,
- final Channel channel, final Promise<BGPSessionImpl> promise) {
+ public SessionNegotiator getSessionNegotiator(final Channel channel, final Promise<BGPSessionImpl> promise) {
return new BGPServerSessionNegotiator(promise, channel, registry, validator);
}
}
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
import java.io.IOException;
import java.util.Date;
import java.util.Set;
import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Keepalive;
import org.slf4j.LoggerFactory;
@VisibleForTesting
-public class BGPSessionImpl extends AbstractProtocolSession<Notification> implements BGPSession, BGPSessionStatistics {
+public class BGPSessionImpl extends SimpleChannelInboundHandler<Notification> implements BGPSession, BGPSessionStatistics, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BGPSessionImpl.class);
*
* @param msg incoming message
*/
- @Override
public synchronized void handleMessage(final Notification msg) {
// Update last reception time
this.lastMessageReceivedAt = System.nanoTime();
}
}
- @Override
public synchronized void endOfInput() {
if (this.state == State.UP) {
this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
* Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be
* modified, because he initiated the closing. (To prevent concurrent modification exception).
*
- * @param closeObject
+ * @param error
*/
private void terminate(final BGPError error) {
this.writeAndFlush(new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()).build());
return this.tableTypes;
}
- @Override
protected synchronized void sessionUp() {
this.sessionStats.startSessionStopwatch();
this.state = State.UP;
ChannelOutputLimiter getLimiter() {
return this.limiter;
}
+
+ @Override
+ public final void channelInactive(ChannelHandlerContext ctx) {
+ LOG.debug("Channel {} inactive.", ctx.channel());
+ this.endOfInput();
+
+ try {
+ super.channelInactive(ctx);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+ }
+ }
+
+ @Override
+ protected final void channelRead0(ChannelHandlerContext ctx, Notification msg) {
+ LOG.debug("Message was received: {}", msg);
+ this.handleMessage(msg);
+ }
+
+ @Override
+ public final void handlerAdded(ChannelHandlerContext ctx) {
+ this.sessionUp();
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl.protocol;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+import java.net.InetSocketAddress;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultPromise<S> {
+ private static final Logger LOG = LoggerFactory.getLogger(BGPProtocolSessionPromise.class);
+ private final ReconnectStrategy strategy;
+ private final Bootstrap b;
+
+ private InetSocketAddress address;
+ @GuardedBy("this")
+ private Future<?> pending;
+
+ public BGPProtocolSessionPromise(EventExecutor executor, InetSocketAddress address, ReconnectStrategy strategy, Bootstrap b) {
+ super(executor);
+ this.strategy = Preconditions.checkNotNull(strategy);
+ this.address = Preconditions.checkNotNull(address);
+ this.b = Preconditions.checkNotNull(b);
+ }
+
+ public synchronized void connect() {
+ final BGPProtocolSessionPromise lock = this;
+
+ try {
+ int e = this.strategy.getConnectTimeout();
+ LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(e));
+ if (this.address.isUnresolved()) {
+ this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort());
+ }
+
+ this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, e);
+ final ChannelFuture connectFuture = this.b.connect(this.address);
+ connectFuture.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener(lock));
+ this.pending = connectFuture;
+ } catch (Exception e) {
+ LOG.info("Failed to connect to {}", this.address, e);
+ this.setFailure(e);
+ }
+
+ }
+
+ @Override
+ public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ if (super.cancel(mayInterruptIfRunning)) {
+ this.pending.cancel(mayInterruptIfRunning);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized Promise<S> setSuccess(final S result) {
+ LOG.debug("Promise {} completed", this);
+ this.strategy.reconnectSuccessful();
+ return super.setSuccess(result);
+ }
+
+ private class BootstrapConnectListener implements ChannelFutureListener {
+ private final Object lock;
+
+ public BootstrapConnectListener(final Object lock) {
+ this.lock = lock;
+ }
+
+ @Override
+ public void operationComplete(final ChannelFuture cf) throws Exception {
+ synchronized (this.lock) {
+ BGPProtocolSessionPromise.LOG.debug("Promise {} connection resolved", this.lock);
+ Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(cf));
+ if (BGPProtocolSessionPromise.this.isCancelled()) {
+ if (cf.isSuccess()) {
+ BGPProtocolSessionPromise.LOG.debug("Closing channel for cancelled promise {}", this.lock);
+ cf.channel().close();
+ }
+
+ } else if (cf.isSuccess()) {
+ BGPProtocolSessionPromise.LOG.debug("Promise {} connection successful", this.lock);
+ } else {
+ BGPProtocolSessionPromise.LOG.debug("Attempt to connect to {} failed", BGPProtocolSessionPromise.this.address, cf.cause());
+ final Future rf = BGPProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
+ rf.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener.ReconnectingStrategyListener());
+ BGPProtocolSessionPromise.this.pending = rf;
+ }
+ }
+ }
+
+ private class ReconnectingStrategyListener implements FutureListener<Void> {
+ private ReconnectingStrategyListener() {
+ }
+
+ @Override
+ public void operationComplete(final Future<Void> sf) {
+ synchronized (BootstrapConnectListener.this.lock) {
+ Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(sf));
+ if (!BGPProtocolSessionPromise.this.isCancelled()) {
+ if (sf.isSuccess()) {
+ BGPProtocolSessionPromise.this.connect();
+ } else {
+ BGPProtocolSessionPromise.this.setFailure(sf.cause());
+ }
+ }
+
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl.protocol;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+import java.net.InetSocketAddress;
+import org.opendaylight.protocol.bgp.rib.impl.BGPDispatcherImpl;
+import org.opendaylight.protocol.bgp.rib.impl.BGPSessionImpl;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BGPReconnectPromise extends DefaultPromise<Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(BGPReconnectPromise.class);
+
+ private final InetSocketAddress address;
+ private final ReconnectStrategyFactory strategyFactory;
+ private final Bootstrap b;
+ private final BGPDispatcherImpl.ChannelPipelineInitializer initializer;
+ private final EventExecutor executor;
+ private Future<BGPSessionImpl> pending;
+
+ public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
+ final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap b,
+ final BGPDispatcherImpl.ChannelPipelineInitializer initializer) {
+ super(executor);
+ this.executor = executor;
+ this.b = b;
+ this.initializer = Preconditions.checkNotNull(initializer);
+ this.address = Preconditions.checkNotNull(address);
+ this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
+ }
+
+ public synchronized void connect() {
+ final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
+
+ // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
+ pending = createClient(this.address, cs, b, new BGPDispatcherImpl.ChannelPipelineInitializer() {
+ @Override
+ public void initializeChannel(final SocketChannel channel, final Promise<BGPSessionImpl> promise) {
+ initializer.initializeChannel(channel, promise);
+ // add closed channel handler
+ // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+ // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+ // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+ channel.pipeline().addLast(new ClosedChannelHandler(BGPReconnectPromise.this));
+ }
+ });
+
+ pending.addListener(new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess()) {
+ BGPReconnectPromise.this.setFailure(future.cause());
+ }
+ }
+ });
+ }
+
+ public Future<BGPSessionImpl> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap,
+ final BGPDispatcherImpl.ChannelPipelineInitializer initializer) {
+ final BGPProtocolSessionPromise p = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
+ final ChannelHandler chInit = BGPDispatcherImpl.BGPChannel.createChannelInitializer(initializer, p);
+ bootstrap.handler(chInit);
+ p.connect();
+ LOG.debug("Client created.");
+ return p;
+ }
+
+ /**
+ * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
+ */
+ private boolean isInitialConnectFinished() {
+ Preconditions.checkNotNull(pending);
+ return pending.isDone() && pending.isSuccess();
+ }
+
+ @Override
+ public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ if (super.cancel(mayInterruptIfRunning)) {
+ Preconditions.checkNotNull(pending);
+ this.pending.cancel(mayInterruptIfRunning);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Channel handler that responds to channelInactive event and reconnects the session.
+ * Only if the promise was not canceled.
+ */
+ private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
+ private final BGPReconnectPromise promise;
+
+ public ClosedChannelHandler(final BGPReconnectPromise promise) {
+ this.promise = promise;
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ // This is the ultimate channel inactive handler, not forwarding
+ if (promise.isCancelled()) {
+ return;
+ }
+
+ if (!promise.isInitialConnectFinished()) {
+ LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
+ }
+
+ LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
+ promise.connect();
+ }
+ }
+}
*/
package org.opendaylight.protocol.bgp.rib.impl.spi;
+import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import java.net.InetSocketAddress;
import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
/**
* Dispatcher class for creating BGP clients.
*/
-public interface BGPDispatcher extends BGPServerDispatcher {
+public interface BGPDispatcher{
/**
* Creates BGP client.
BGPPeerRegistry peerRegistry, ReconnectStrategy strategy);
Future<Void> createReconnectingClient(InetSocketAddress address, AsNumber remoteAs,
- BGPPeerRegistry peerRegistry, ReconnectStrategyFactory connectStrategyFactory,
- ReconnectStrategyFactory reestablishStrategyFactory);
+ BGPPeerRegistry peerRegistry, ReconnectStrategyFactory connectStrategyFactory, KeyMapping keys);
- Future<Void> createReconnectingClient(InetSocketAddress address, AsNumber remoteAs,
- BGPPeerRegistry peerRegistry, ReconnectStrategyFactory connectStrategyFactory,
- ReconnectStrategyFactory reestablishStrategyFactory, KeyMapping keys);
+ /**
+ * Create new BGP server to accept incoming bgp connections (bound to provided socket address).
+ */
+ ChannelFuture createServer(BGPPeerRegistry peerRegistry, InetSocketAddress address, BGPSessionValidator sessionValidator);
}
Mockito.doReturn(GlobalEventExecutor.INSTANCE.newSucceededFuture(null)).when(this.dispatcher).createReconnectingClient(
Mockito.any(InetSocketAddress.class), Mockito.any(AsNumber.class),
- Mockito.any(BGPPeerRegistry.class), Mockito.eq(this.tcpStrategyFactory), Mockito.eq(this.sessionStrategy),
- Mockito.any(KeyMapping.class));
+ Mockito.any(BGPPeerRegistry.class), Mockito.eq(this.tcpStrategyFactory), Mockito.any(KeyMapping.class));
this.ext1 = new SimpleRIBExtensionProviderContext();
this.ext2 = new SimpleRIBExtensionProviderContext();
public void releaseConnection() {
LOG.debug("Releasing connection");
if (this.session != null) {
- this.session.close();
+ try {
+ this.session.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing session", e);
+ }
+ this.session = null;
}
}
import com.google.common.base.Optional;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
-import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.AbstractDispatcher;
import org.opendaylight.protocol.framework.ReconnectStrategy;
import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
-public class TestClientDispatcher extends AbstractDispatcher<BGPSessionImpl, BGPSessionListener> {
-
- private static final String NEGOTIATOR = "negotiator";
+public class TestClientDispatcher {
private final BGPHandlerFactory hf;
- private InetSocketAddress localAddress;
private final InetSocketAddress defaulAddress;
+ private InetSocketAddress localAddress;
+ private final BGPDispatcherImpl disp;
protected TestClientDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final MessageRegistry messageRegistry,
- final InetSocketAddress locaAddress) {
- super(bossGroup, workerGroup);
+ final InetSocketAddress locaAddress) {
+ disp = new BGPDispatcherImpl(messageRegistry, bossGroup, workerGroup) {
+ @Override
+ protected void customizeBootstrap(final Bootstrap b) {
+ b.localAddress(locaAddress);
+ }
+ };
this.hf = new BGPHandlerFactory(messageRegistry);
this.localAddress = locaAddress;
this.defaulAddress = locaAddress;
}
public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress,
- final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy, final Optional<InetSocketAddress> localAddress) {
+ final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy, final Optional<InetSocketAddress> localAddress) {
setLocalAddress(localAddress);
- final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, listener);
- return super.createClient(remoteAddress, strategy, new PipelineInitializer<BGPSessionImpl>() {
-
- @Override
- public void initializeChannel(SocketChannel ch, Promise<BGPSessionImpl> promise) {
- ch.pipeline().addLast(TestClientDispatcher.this.hf.getDecoders());
- ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(null, ch, promise));
- ch.pipeline().addLast(TestClientDispatcher.this.hf.getEncoders());
- }
- });
+ return disp.createClient(remoteAddress, remoteAs, listener, strategy);
}
public synchronized Future<Void> createReconnectingClient(final InetSocketAddress address,
- final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory reconnectStrategyFactory,
- final Optional<InetSocketAddress> localAddress) {
+ final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory reconnectStrategyFactory,
+ final Optional<InetSocketAddress> localAddress) {
setLocalAddress(localAddress);
- final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, peerRegistry);
- final Future<Void> ret = super.createReconnectingClient(address, reconnectStrategyFactory, new PipelineInitializer<BGPSessionImpl>() {
- @Override
- public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
- ch.pipeline().addLast(TestClientDispatcher.this.hf.getDecoders());
- ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(null, ch, promise));
- ch.pipeline().addLast(TestClientDispatcher.this.hf.getEncoders());
- }
- });
-
- return ret;
- }
-
- @Override
- protected void customizeBootstrap(Bootstrap b) {
- b.localAddress(this.localAddress);
- super.customizeBootstrap(b);
+ return disp.createReconnectingClient(address, remoteAs, peerRegistry, reconnectStrategyFactory, null);
}
private synchronized void setLocalAddress(final Optional<InetSocketAddress> localAddress) {
<groupId>org.opendaylight.yangtools.model</groupId>
<artifactId>ietf-inet-types</artifactId>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>protocol-framework</artifactId>
- </dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
+import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.Set;
import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl;
private static final long AS = 30L;
+ @Override
+ public void channelRegistered(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+ }
+
+ @Override
+ public void channelUnregistered(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+ }
+
+ @Override
+ public void channelActive(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext channelHandlerContext, final Object o) throws Exception {
+
+ }
+
+ @Override
+ public void channelReadComplete(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+ }
+
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext channelHandlerContext, final Object o) throws Exception {
+
+ }
+
+ @Override
+ public void channelWritabilityChanged(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+ }
+
+ @Override
+ public void handlerAdded(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+ }
+
+ @Override
+ public void handlerRemoved(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext channelHandlerContext, final Throwable throwable) throws Exception {
+
+ }
+
@Override
public void close() {
LOG.debug("Session {} closed", this);
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-api</artifactId>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>protocol-framework</artifactId>
- </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-core-api</artifactId>
*/
package org.opendaylight.protocol.bgp.rib.spi;
+import io.netty.channel.ChannelInboundHandler;
import java.util.Set;
-
-import org.opendaylight.protocol.framework.ProtocolSession;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.BgpTableType;
-import org.opendaylight.yangtools.yang.binding.Notification;
/**
* BGP Session represents the finite state machine in BGP, including timers and its purpose is to create a BGP
*
* If the session is up, it has to redirect messages to/from user. Handles also malformed messages and unknown requests.
*/
-public interface BGPSession extends ProtocolSession<Notification> {
+public interface BGPSession extends AutoCloseable, ChannelInboundHandler {
/**
* Return the list of tables which the peer has advertised to support.
*
*/
package org.opendaylight.protocol.bgp.rib.spi;
-import org.opendaylight.protocol.framework.SessionListener;
+import java.util.EventListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey;
import org.opendaylight.yangtools.yang.binding.Notification;
/**
* Listener that receives session informations from the session.
*/
-public interface BGPSessionListener extends SessionListener<Notification, BGPSession, BGPTerminationReason> {
+public interface BGPSessionListener extends EventListener {
/**
* Returns state of BGP session associated with this listener.
* @param tablesKey of the table where synchronization finished
*/
void markUptodate(final TablesKey tablesKey);
+
+ /**
+ * Fired when the session was established successfully.
+ *
+ * @param session Peer address families which we accepted
+ */
+ void onSessionUp(BGPSession session);
+ /**
+ * Fired when the session went down because of an IO error. Implementation should take care of closing underlying
+ * session.
+ *
+ * @param session that went down
+ * @param e Exception that was thrown as the cause of session being down
+ */
+
+ void onSessionDown(BGPSession session, Exception e);
+ /**
+ * Fired when the session is terminated locally. The session has already been closed and transitioned to IDLE state.
+ * Any outstanding queued messages were not sent. The user should not attempt to make any use of the session.
+ *
+ * @param reason the cause why the session went down
+ */
+ void onSessionTerminated(BGPSession session, BGPTerminationReason reason);
+
+ /**
+ * Fired when a normal protocol message is received.
+ *
+ * @param notification Protocol message
+ */
+ void onMessage(BGPSession session, Notification notification);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.spi;
+
+import io.netty.channel.Channel;
+import io.netty.util.concurrent.Promise;
+
+public interface BGPSessionNegotiatorFactory<S extends BGPSession> {
+ SessionNegotiator getSessionNegotiator(Channel channel, Promise<S> promise);
+}
\ No newline at end of file
import com.google.common.base.MoreObjects;
import org.opendaylight.protocol.bgp.parser.BGPError;
-import org.opendaylight.protocol.framework.TerminationReason;
-public final class BGPTerminationReason implements TerminationReason {
+public final class BGPTerminationReason {
private final BGPError error;
public BGPTerminationReason(final BGPError error) {
this.error = error;
}
- @Override
public String getErrorMessage() {
return error.toString();
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.spi;
+
+import io.netty.channel.ChannelInboundHandler;
+
+public interface SessionNegotiator extends ChannelInboundHandler {
+}
@Override
public void onSessionDown(final BGPSession session, final Exception e) {
LOG.info("Client Listener: Connection lost.");
- session.close();
+ try {
+ session.close();
+ } catch (Exception ie) {
+ LOG.warn("Error closing session", ie);
+ }
}
@Override
import com.google.common.base.Preconditions;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
import org.opendaylight.protocol.bgp.parser.spi.pojo.ServiceLoaderBGPExtensionProviderContext;
+import org.opendaylight.protocol.bgp.rib.impl.BGPDispatcherImpl;
import org.opendaylight.protocol.bgp.rib.impl.BGPHandlerFactory;
import org.opendaylight.protocol.bgp.rib.impl.BGPServerSessionNegotiatorFactory;
import org.opendaylight.protocol.bgp.rib.impl.BGPSessionImpl;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator;
import org.opendaylight.protocol.bgp.rib.impl.spi.ReusableBGPPeer;
import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.AbstractDispatcher;
-import org.opendaylight.protocol.framework.ProtocolSession;
-import org.opendaylight.protocol.framework.SessionListener;
-import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv4AddressFamily;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.SubsequentAddressFamily;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
-import org.opendaylight.yangtools.yang.binding.Notification;
-public class BGPSpeakerMock<M, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends AbstractDispatcher<S, L> {
+public class BGPSpeakerMock {
- private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
+ private final BGPServerSessionNegotiatorFactory negotiatorFactory;
private final BGPHandlerFactory factory;
+ private final BGPDispatcherImpl disp;
+ private final BGPPeerRegistry peerRegistry;
+ private final Map<Class<? extends AddressFamily>, Class<? extends SubsequentAddressFamily>> tables;
- public BGPSpeakerMock(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final BGPHandlerFactory factory,
- final DefaultPromise<BGPSessionImpl> defaultPromise) {
- super(GlobalEventExecutor.INSTANCE, new NioEventLoopGroup(), new NioEventLoopGroup());
+ private BGPSpeakerMock(final BGPServerSessionNegotiatorFactory negotiatorFactory, final BGPHandlerFactory factory,
+ final DefaultPromise<BGPSessionImpl> defaultPromise) {
+ disp = new BGPDispatcherImpl(null, new NioEventLoopGroup(), new NioEventLoopGroup());
this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
this.factory = Preconditions.checkNotNull(factory);
- }
- public void createServer(final InetSocketAddress address) {
- super.createServer(address, new PipelineInitializer<S>() {
+ peerRegistry = new BGPPeerRegistry() {
@Override
- public void initializeChannel(final SocketChannel ch, final Promise<S> promise) {
- ch.pipeline().addLast(BGPSpeakerMock.this.factory.getDecoders());
- ch.pipeline().addLast("negotiator",
- BGPSpeakerMock.this.negotiatorFactory.getSessionNegotiator(null, ch, promise));
- ch.pipeline().addLast(BGPSpeakerMock.this.factory.getEncoders());
+ public void addPeer(final IpAddress ip, final ReusableBGPPeer peer, final BGPSessionPreferences prefs) {
}
- });
- }
-
- public static void main(final String[] args) throws Exception {
- final Map<Class<? extends AddressFamily>, Class<? extends SubsequentAddressFamily>> tables = new HashMap<>();
- tables.put(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
- tables.put(LinkstateAddressFamily.class, LinkstateSubsequentAddressFamily.class);
- final BGPPeerRegistry peerRegistry = new BGPPeerRegistry() {
@Override
- public void addPeer(final IpAddress ip, final ReusableBGPPeer peer, final BGPSessionPreferences prefs) {}
-
- @Override
- public void removePeer(final IpAddress ip) {}
+ public void removePeer(final IpAddress ip) {
+ }
@Override
public boolean isPeerConfigured(final IpAddress ip) {
}
@Override
- public void removePeerSession(final IpAddress ip) {}
+ public void removePeerSession(final IpAddress ip) {
+ }
};
- final SessionNegotiatorFactory<Notification, BGPSessionImpl, BGPSessionListener> snf = new BGPServerSessionNegotiatorFactory(new BGPSessionValidator() {
+ tables = new HashMap<>();
+ tables.put(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
+ tables.put(LinkstateAddressFamily.class, LinkstateSubsequentAddressFamily.class);
+ }
+
+ public void main(final String[] args) {
+
+ final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(new BGPSessionValidator() {
@Override
public void validate(final Open openObj, final BGPSessionPreferences prefs) throws BGPDocumentedException {
// NOOP
}
}, peerRegistry);
- final BGPSpeakerMock<Notification, BGPSessionImpl, BGPSessionListener> mock = new BGPSpeakerMock<>(snf, new BGPHandlerFactory(ServiceLoaderBGPExtensionProviderContext.getSingletonInstance().getMessageRegistry()), new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
+ final BGPSpeakerMock mock = new BGPSpeakerMock(snf, new BGPHandlerFactory(ServiceLoaderBGPExtensionProviderContext.getSingletonInstance().getMessageRegistry()), new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
mock.createServer(new InetSocketAddress("127.0.0.2", 12345));
}
+
+ private void createServer(final InetSocketAddress address) {
+ disp.createServer(peerRegistry,address, new BGPSessionValidator() {
+ @Override
+ public void validate(final Open openObj, final BGPSessionPreferences prefs) throws BGPDocumentedException {
+ // NOOP
+ }
+ });
+ }
}
@Override
public void onSessionDown(final BGPSession session, final Exception e) {
LOG.info("Server: Session down.");
- session.close();
+ try {
+ session.close();
+ } catch (Exception ie) {
+ LOG.warn("Error closing session", ie);
+ }
// this.d.stop();
}
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-binding-config</artifactId>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>protocol-framework</artifactId>
- </dependency>
<dependency>
<groupId>io.netty</groupId>