BUG-2873 : Remove dependency protocol-framework on BGP 76/23276/14
authorClaudio D. Gasparini <cgaspari@cisco.com>
Thu, 25 Jun 2015 12:24:43 +0000 (14:24 +0200)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 13 Jul 2015 11:45:35 +0000 (11:45 +0000)
Change-Id: Id04890456a517ca416f18be3de5bb3e777a5bfe9
Signed-off-by: Claudio D. Gasparini <cgaspari@cisco.com>
26 files changed:
bgp/parser-api/src/main/java/org/opendaylight/protocol/bgp/parser/BGPErrorIdentifier.java
bgp/rib-impl/src/main/java/org/opendaylight/controller/config/yang/bgp/rib/impl/BGPPeerModule.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPSessionNegotiator.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPClientSessionNegotiatorFactory.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPPeer.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPServerSessionNegotiatorFactory.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPProtocolSessionPromise.java [new file with mode: 0644]
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPReconnectPromise.java [new file with mode: 0644]
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/spi/BGPDispatcher.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ParserToSalTest.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/SimpleSessionListener.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/TestClientDispatcher.java
bgp/rib-mock/pom.xml
bgp/rib-mock/src/main/java/org/opendaylight/protocol/bgp/rib/mock/EventBusRegistration.java
bgp/rib-spi/pom.xml
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/BGPSession.java
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/BGPSessionListener.java
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/BGPSessionNegotiatorFactory.java [new file with mode: 0644]
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/BGPTerminationReason.java
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/SessionNegotiator.java [new file with mode: 0644]
bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/TestingListener.java
bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/BGPSpeakerMock.java
bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/SpeakerSessionListener.java
bgp/topology-provider/pom.xml

index acfcb7c3a1ccd8a982847a34a9f4cce7295eb109..bcaa35242e72e7303a396c33ddcb524aa041c0d3 100644 (file)
@@ -7,10 +7,13 @@
  */
 package org.opendaylight.protocol.bgp.parser;
 
+import java.io.Serializable;
+
 /**
  * Caret for combination of Error-type and Error-value
  */
-final class BGPErrorIdentifier {
+final class BGPErrorIdentifier implements Serializable {
+    private static final long serialVersionUID = 5722575354944165734L;
     private final short code;
     private final short subcode;
 
index c5153be899e532cbff953fb4223774901db16590..346852e318c0c9022c65de4ce22ab31919f29226 100644 (file)
@@ -209,8 +209,7 @@ public final class BGPPeerModule extends org.opendaylight.controller.config.yang
         }
 
         final RIB rib = getRibDependency();
-        return rib.getDispatcher().createReconnectingClient(address, remoteAs, registry, rib.getTcpStrategyFactory(),
-            rib.getSessionStrategyFactory(), keys);
+        return rib.getDispatcher().createReconnectingClient(address, remoteAs, registry, rib.getTcpStrategyFactory(), keys);
     }
 
     private BGPPeerRegistry getPeerRegistryBackwards() {
index 0539e5d4bc733c648e70045fb1a39e406b70ad69..96bf3af781ea82f5765074798f506555d0ba4c3b 100644 (file)
@@ -11,6 +11,10 @@ package org.opendaylight.protocol.bgp.rib.impl;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.Promise;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
@@ -20,7 +24,7 @@ import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
+import org.opendaylight.protocol.bgp.rib.spi.SessionNegotiator;
 import org.opendaylight.protocol.util.Values;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
@@ -39,9 +43,9 @@ import org.slf4j.LoggerFactory;
  * Bgp Session negotiator. Common for local-to-remote and remote-to-local connections.
  * One difference is session validation performed by injected BGPSessionValidator when OPEN message is received.
  */
-public abstract class AbstractBGPSessionNegotiator extends AbstractSessionNegotiator<Notification, BGPSessionImpl> {
+public abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandlerAdapter implements SessionNegotiator {
     // 4 minutes recommended in http://tools.ietf.org/html/rfc4271#section-8.2.2
-    protected static final int INITIAL_HOLDTIMER = 4;
+    private static final int INITIAL_HOLDTIMER = 4;
 
     /**
      * @see <a href="http://tools.ietf.org/html/rfc6793">BGP Support for 4-Octet AS Number Space</a>
@@ -72,7 +76,8 @@ public abstract class AbstractBGPSessionNegotiator extends AbstractSessionNegoti
     private static final Logger LOG = LoggerFactory.getLogger(AbstractBGPSessionNegotiator.class);
     private final BGPPeerRegistry registry;
     private final BGPSessionValidator sessionValidator;
-
+    private final Promise<BGPSessionImpl> promise;
+    private final Channel channel;
     @GuardedBy("this")
     private State state = State.IDLE;
 
@@ -81,13 +86,13 @@ public abstract class AbstractBGPSessionNegotiator extends AbstractSessionNegoti
 
     public AbstractBGPSessionNegotiator(final Promise<BGPSessionImpl> promise, final Channel channel,
             final BGPPeerRegistry registry, final BGPSessionValidator sessionValidator) {
-        super(promise, channel);
+        this.promise = Preconditions.checkNotNull(promise);
+        this.channel = Preconditions.checkNotNull(channel);
         this.registry = registry;
         this.sessionValidator = sessionValidator;
     }
 
-    @Override
-    protected synchronized void startNegotiation() {
+    private synchronized void startNegotiation() {
         Preconditions.checkState(this.state == State.IDLE);
 
         // Check if peer is configured in registry before retrieving preferences
@@ -133,7 +138,6 @@ public abstract class AbstractBGPSessionNegotiator extends AbstractSessionNegoti
         return StrictBGPPeerRegistry.getIpAddress(this.channel.remoteAddress());
     }
 
-    @Override
     protected synchronized void handleMessage(final Notification msg) {
         LOG.debug("Channel {} handling message in state {}", this.channel, this.state);
 
@@ -203,8 +207,7 @@ public abstract class AbstractBGPSessionNegotiator extends AbstractSessionNegoti
         }
     }
 
-    @Override
-    protected void negotiationFailed(final Throwable e) {
+    private void negotiationFailed(final Throwable e) {
         LOG.warn("Channel {} negotiation failed: {}", this.channel, e.getMessage());
         if (e instanceof BGPDocumentedException) {
             // although sendMessage() can also result in calling this method, it won't create a cycle. In case sendMessage() fails to
@@ -212,7 +215,7 @@ public abstract class AbstractBGPSessionNegotiator extends AbstractSessionNegoti
             this.sendMessage(buildErrorNotify(((BGPDocumentedException)e).getError(), ((BGPDocumentedException) e).getData()));
         }
         this.registry.removePeerSession(getRemoteIp());
-        super.negotiationFailed(e);
+        negotiationFailedCloseChannel(e);
         this.state = State.FINISHED;
     }
 
@@ -240,4 +243,63 @@ public abstract class AbstractBGPSessionNegotiator extends AbstractSessionNegoti
     public synchronized State getState() {
         return this.state;
     }
+
+    private final void negotiationSuccessful(BGPSessionImpl session) {
+        LOG.debug("Negotiation on channel {} successful with session {}", this.channel, session);
+        channel.pipeline().replace(this, "session", session);
+        promise.setSuccess(session);
+    }
+
+    private void negotiationFailedCloseChannel(Throwable cause) {
+        LOG.debug("Negotiation on channel {} failed", this.channel, cause);
+        channel.close();
+        promise.setFailure(cause);
+    }
+
+    private final void sendMessage(final Notification msg) {
+        channel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture f) {
+                if (!f.isSuccess()) {
+                    LOG.info("Failed to send message {}", msg, f.cause());
+                    negotiationFailedCloseChannel(f.cause());
+                } else {
+                    LOG.trace("Message {} sent to socket", msg);
+                }
+
+            }
+        });
+    }
+
+    @Override
+    public final void channelActive(ChannelHandlerContext ctx) {
+        LOG.debug("Starting session negotiation on channel {}", this.channel);
+
+        try {
+            this.startNegotiation();
+        } catch (final Exception e) {
+            LOG.warn("Unexpected negotiation failure", e);
+            negotiationFailedCloseChannel(e);
+        }
+
+    }
+
+    @Override
+    public final void channelRead(ChannelHandlerContext ctx, Object msg) {
+        LOG.debug("Negotiation read invoked on channel {}", this.channel);
+
+        try {
+            handleMessage((Notification) msg);
+        } catch (Exception e) {
+            LOG.debug("Unexpected error while handling negotiation message {}", msg, e);
+            negotiationFailedCloseChannel(e);
+        }
+
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        LOG.info("Unexpected error during negotiation", cause);
+        negotiationFailedCloseChannel(cause);
+    }
 }
index bcdba2fe65609812c5f82a5ff024f26a3b077f71..4b6736a177aceb8a6480e665ef55f96d06661785 100644 (file)
@@ -9,16 +9,12 @@ package org.opendaylight.protocol.bgp.rib.impl;
 
 import io.netty.channel.Channel;
 import io.netty.util.concurrent.Promise;
-
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
-import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.SessionListenerFactory;
-import org.opendaylight.protocol.framework.SessionNegotiator;
-import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
+import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
+import org.opendaylight.protocol.bgp.rib.spi.SessionNegotiator;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
-import org.opendaylight.yangtools.yang.binding.Notification;
 
-public final class BGPClientSessionNegotiatorFactory implements SessionNegotiatorFactory<Notification, BGPSessionImpl, BGPSessionListener> {
+public final class BGPClientSessionNegotiatorFactory implements BGPSessionNegotiatorFactory<BGPSessionImpl> {
     private final BGPClientSessionValidator validator;
     private final BGPPeerRegistry peerRegistry;
 
@@ -28,8 +24,7 @@ public final class BGPClientSessionNegotiatorFactory implements SessionNegotiato
     }
 
     @Override
-    public SessionNegotiator<BGPSessionImpl> getSessionNegotiator(final SessionListenerFactory<BGPSessionListener> factory,
-            final Channel channel, final Promise<BGPSessionImpl> promise) {
+    public SessionNegotiator getSessionNegotiator(final Channel channel, final Promise<BGPSessionImpl> promise) {
         return new BGPClientSessionNegotiator(promise, channel, peerRegistry, validator);
     }
 }
index 9ac117939ceb4acabc860a4107685636647fb6e1..baa2073ead0d5d347043cb4a4ccd436288d46f75 100644 (file)
@@ -7,21 +7,31 @@
  */
 package org.opendaylight.protocol.bgp.rib.impl;
 
+import com.google.common.base.Preconditions;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry;
+import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPProtocolSessionPromise;
+import org.opendaylight.protocol.bgp.rib.impl.protocol.BGPReconnectPromise;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator;
-import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.AbstractDispatcher;
+import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.tcpmd5.api.KeyMapping;
@@ -29,98 +39,99 @@ import org.opendaylight.tcpmd5.netty.MD5ChannelFactory;
 import org.opendaylight.tcpmd5.netty.MD5ChannelOption;
 import org.opendaylight.tcpmd5.netty.MD5ServerChannelFactory;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of BGPDispatcher.
  */
-public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl, BGPSessionListener> implements BGPDispatcher, AutoCloseable {
+public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
     private final MD5ServerChannelFactory<?> scf;
     private final MD5ChannelFactory<?> cf;
     private final BGPHandlerFactory hf;
+    private final EventLoopGroup bossGroup;
+    private final EventLoopGroup workerGroup;
+    private final EventExecutor executor;
     private KeyMapping keys;
-    private static final String NEGOTIATOR = "negotiator";
 
     public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
         this(messageRegistry, bossGroup, workerGroup, null, null);
     }
 
     public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final MD5ChannelFactory<?> cf, final MD5ServerChannelFactory<?> scf) {
-        super(bossGroup, workerGroup);
+        this.bossGroup = Preconditions.checkNotNull(bossGroup);
+        this.workerGroup = Preconditions.checkNotNull(workerGroup);
+        this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE);
         this.hf = new BGPHandlerFactory(messageRegistry);
         this.cf = cf;
         this.scf = scf;
+        this.keys = null;
     }
 
     @Override
     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress address,
-        final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy) {
+                                                            final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy) {
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, listener);
-        return super.createClient(address, strategy, new PipelineInitializer<BGPSessionImpl>() {
-            @Override
-            public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
-                ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders());
-                ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(null, ch, promise));
-                ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders());
-            }
-        });
-    }
-
-    @Override
-    public Future<Void> createReconnectingClient(final InetSocketAddress address,
-        final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategyFactory connectStrategyFactory,
-        final ReconnectStrategyFactory reestablishStrategyFactory) {
-        return this.createReconnectingClient(address, remoteAs, listener, connectStrategyFactory, reestablishStrategyFactory,
-            null);
+        final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer
+            (BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders());
+
+        final Bootstrap b = new Bootstrap();
+        final BGPProtocolSessionPromise p = new BGPProtocolSessionPromise(this.executor, address, strategy, b);
+        b.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(true));
+        b.handler(BGPChannel.createChannelInitializer(initializer, p));
+        this.customizeBootstrap(b);
+        this.setWorkerGroup(b);
+        p.connect();
+        LOG.debug("Client created.");
+        return p;
     }
 
     @Override
     public void close() {
+        try {
+            this.workerGroup.shutdownGracefully();
+        } finally {
+            this.bossGroup.shutdownGracefully();
+        }
     }
 
     @Override
     public synchronized Future<Void> createReconnectingClient(final InetSocketAddress address,
-        final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory connectStrategyFactory,
-        final ReconnectStrategyFactory reestablishStrategyFactory, final KeyMapping keys) {
+                                                              final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory connectStrategyFactory,
+                                                              final KeyMapping keys) {
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, peerRegistry);
-
         this.keys = keys;
-        final Future<Void> ret = super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategyFactory.createReconnectStrategy(), new PipelineInitializer<BGPSessionImpl>() {
-            @Override
-            public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
-                ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders());
-                ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(null, ch, promise));
-                ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders());
-            }
-        });
+
+        final Bootstrap b = new Bootstrap();
+        final BGPReconnectPromise p = new BGPReconnectPromise(GlobalEventExecutor.INSTANCE, address,
+            connectStrategyFactory, b, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders()));
+        b.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(true));
+        this.customizeBootstrap(b);
+        this.setWorkerGroup(b);
+        p.connect();
+
         this.keys = null;
 
-        return ret;
+        return p;
     }
 
     @Override
     public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address, final BGPSessionValidator sessionValidator) {
-        return this.createServer(registry, address, sessionValidator, null);
-    }
-
-    @Override
-    public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address, final BGPSessionValidator sessionValidator, final KeyMapping keys) {
         final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(sessionValidator, registry);
-
-        this.keys = keys;
-        final ChannelFuture ret = super.createServer(address, new PipelineInitializer<BGPSessionImpl>() {
-            @Override
-            public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
-                ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getDecoders());
-                ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(null, ch, promise));
-                ch.pipeline().addLast(BGPDispatcherImpl.this.hf.getEncoders());
-            }
-        });
-        this.keys = null;
-
-        return ret;
+        final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer
+            (BGPDispatcherImpl.this.hf.getDecoders(), snf, BGPDispatcherImpl.this.hf.getEncoders());
+        final ServerBootstrap b = new ServerBootstrap();
+        b.childHandler(BGPChannel.createChannelInitializer(initializer, new DefaultPromise(BGPDispatcherImpl.this.executor)));
+        b.option(ChannelOption.SO_BACKLOG, Integer.valueOf(128));
+        b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        this.customizeBootstrap(b);
+
+        final ChannelFuture f = b.bind(address);
+        LOG.debug("Initiated server {} at {}.", f, address);
+        return f;
     }
 
-    @Override
     protected void customizeBootstrap(final Bootstrap b) {
         if (this.keys != null && !this.keys.isEmpty()) {
             if (this.cf == null) {
@@ -134,8 +145,7 @@ public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl,
         b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
     }
 
-    @Override
-    protected void customizeBootstrap(final ServerBootstrap b) {
+    private void customizeBootstrap(final ServerBootstrap b) {
         if (this.keys != null && !this.keys.isEmpty()) {
             if (this.scf == null) {
                 throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
@@ -145,7 +155,63 @@ public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl,
         }
 
         // Make sure we are doing round-robin processing
-        b.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+        b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+
+        if (b.group() == null) {
+            b.group(this.bossGroup, this.workerGroup);
+        }
+
+        try {
+            b.channel(NioServerSocketChannel.class);
+        } catch (IllegalStateException e) {
+            LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+        }
+    }
+
+    private void setWorkerGroup(final Bootstrap b) {
+        if (b.group() == null) {
+            b.group(this.workerGroup);
+        }
+        try {
+            b.channel(NioSocketChannel.class);
+        } catch (IllegalStateException e) {
+            LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+        }
+    }
+
+    public interface ChannelPipelineInitializer {
+        void initializeChannel(SocketChannel socketChannel, Promise<BGPSessionImpl> promise);
     }
 
+    public static class BGPChannel {
+        private static final String NEGOTIATOR = "negotiator";
+
+        private BGPChannel() {
+
+        }
+
+        public static <T extends BGPSessionNegotiatorFactory> ChannelPipelineInitializer createChannelPipelineInitializer(final ChannelHandler[] channelDecoder,
+                                                                                                                          final T snf,
+                                                                                                                          final ChannelHandler[] channelEncoder) {
+            return new ChannelPipelineInitializer() {
+                @Override
+                public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
+                    ch.pipeline().addLast(channelDecoder);
+                    ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(ch, promise));
+                    ch.pipeline().addLast(channelEncoder);
+                }
+            };
+        }
+
+        public static ChannelHandler createChannelInitializer(final ChannelPipelineInitializer initializer, final Promise<BGPSessionImpl> promise) {
+            return new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel ch) {
+                    initializer.initializeChannel(ch, promise);
+                }
+            };
+        }
+    }
 }
+
+
index 54e1c69fdc8c2dad768498868c0782a198aeafe8..66b3d1c6f406925ba58772f458c89bde886fec02 100644 (file)
@@ -261,7 +261,11 @@ public class BGPPeer implements ReusableBGPPeer, Peer, AutoCloseable, BGPPeerRun
             this.runtimeReg = null;
         }
         if (this.session != null) {
-            this.session.close();
+            try {
+                this.session.close();
+            } catch (final Exception e) {
+                LOG.warn("Error closing session with peer", e);
+            }
             this.session = null;
         }
     }
index 38a1875471c9c5c525f2e56f93fdd1e4df464876..8726a797bada63c408e4f699a3f7b42f2e1f8810 100644 (file)
@@ -8,19 +8,14 @@
 package org.opendaylight.protocol.bgp.rib.impl;
 
 import com.google.common.base.Preconditions;
-
 import io.netty.channel.Channel;
 import io.netty.util.concurrent.Promise;
-
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator;
-import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.SessionListenerFactory;
-import org.opendaylight.protocol.framework.SessionNegotiator;
-import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
-import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.protocol.bgp.rib.spi.BGPSessionNegotiatorFactory;
+import org.opendaylight.protocol.bgp.rib.spi.SessionNegotiator;
 
-public final class BGPServerSessionNegotiatorFactory implements SessionNegotiatorFactory<Notification, BGPSessionImpl, BGPSessionListener> {
+public final class BGPServerSessionNegotiatorFactory implements BGPSessionNegotiatorFactory<BGPSessionImpl> {
     private final BGPSessionValidator validator;
     private final BGPPeerRegistry registry;
 
@@ -30,8 +25,7 @@ public final class BGPServerSessionNegotiatorFactory implements SessionNegotiato
     }
 
     @Override
-    public SessionNegotiator<BGPSessionImpl> getSessionNegotiator(final SessionListenerFactory<BGPSessionListener> factory,
-            final Channel channel, final Promise<BGPSessionImpl> promise) {
+    public SessionNegotiator getSessionNegotiator(final Channel channel, final Promise<BGPSessionImpl> promise) {
         return new BGPServerSessionNegotiator(promise, channel, registry, validator);
     }
 }
index c383f034b4dfa2a4aabb725401da01849565997d..01dd5673df3d892f10f36f561568c7c097888309 100644 (file)
@@ -16,6 +16,8 @@ import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
 import java.io.IOException;
 import java.util.Date;
 import java.util.Set;
@@ -31,7 +33,6 @@ import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionStatistics;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
 import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Keepalive;
@@ -52,7 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @VisibleForTesting
-public class BGPSessionImpl extends AbstractProtocolSession<Notification> implements BGPSession, BGPSessionStatistics {
+public class BGPSessionImpl extends SimpleChannelInboundHandler<Notification> implements BGPSession, BGPSessionStatistics, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(BGPSessionImpl.class);
 
@@ -190,7 +191,6 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
      *
      * @param msg incoming message
      */
-    @Override
     public synchronized void handleMessage(final Notification msg) {
         // Update last reception time
         this.lastMessageReceivedAt = System.nanoTime();
@@ -223,7 +223,6 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
         }
     }
 
-    @Override
     public synchronized void endOfInput() {
         if (this.state == State.UP) {
             this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
@@ -279,7 +278,7 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
      * Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be
      * modified, because he initiated the closing. (To prevent concurrent modification exception).
      *
-     * @param closeObject
+     * @param error
      */
     private void terminate(final BGPError error) {
         this.writeAndFlush(new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()).build());
@@ -364,7 +363,6 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
         return this.tableTypes;
     }
 
-    @Override
     protected synchronized void sessionUp() {
         this.sessionStats.startSessionStopwatch();
         this.state = State.UP;
@@ -412,4 +410,27 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
     ChannelOutputLimiter getLimiter() {
         return this.limiter;
     }
+
+    @Override
+    public final void channelInactive(ChannelHandlerContext ctx) {
+        LOG.debug("Channel {} inactive.", ctx.channel());
+        this.endOfInput();
+
+        try {
+            super.channelInactive(ctx);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+        }
+    }
+
+    @Override
+    protected final void channelRead0(ChannelHandlerContext ctx, Notification msg) {
+        LOG.debug("Message was received: {}", msg);
+        this.handleMessage(msg);
+    }
+
+    @Override
+    public final void handlerAdded(ChannelHandlerContext ctx) {
+        this.sessionUp();
+    }
 }
diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPProtocolSessionPromise.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPProtocolSessionPromise.java
new file mode 100644 (file)
index 0000000..8da1d74
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl.protocol;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+import java.net.InetSocketAddress;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultPromise<S> {
+    private static final Logger LOG = LoggerFactory.getLogger(BGPProtocolSessionPromise.class);
+    private final ReconnectStrategy strategy;
+    private final Bootstrap b;
+
+    private InetSocketAddress address;
+    @GuardedBy("this")
+    private Future<?> pending;
+
+    public BGPProtocolSessionPromise(EventExecutor executor, InetSocketAddress address, ReconnectStrategy strategy, Bootstrap b) {
+        super(executor);
+        this.strategy = Preconditions.checkNotNull(strategy);
+        this.address = Preconditions.checkNotNull(address);
+        this.b = Preconditions.checkNotNull(b);
+    }
+
+    public synchronized void connect() {
+        final BGPProtocolSessionPromise lock = this;
+
+        try {
+            int e = this.strategy.getConnectTimeout();
+            LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(e));
+            if (this.address.isUnresolved()) {
+                this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort());
+            }
+
+            this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, e);
+            final ChannelFuture connectFuture = this.b.connect(this.address);
+            connectFuture.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener(lock));
+            this.pending = connectFuture;
+        } catch (Exception e) {
+            LOG.info("Failed to connect to {}", this.address, e);
+            this.setFailure(e);
+        }
+
+    }
+
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        if (super.cancel(mayInterruptIfRunning)) {
+            this.pending.cancel(mayInterruptIfRunning);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized Promise<S> setSuccess(final S result) {
+        LOG.debug("Promise {} completed", this);
+        this.strategy.reconnectSuccessful();
+        return super.setSuccess(result);
+    }
+
+    private class BootstrapConnectListener implements ChannelFutureListener {
+        private final Object lock;
+
+        public BootstrapConnectListener(final Object lock) {
+            this.lock = lock;
+        }
+
+        @Override
+        public void operationComplete(final ChannelFuture cf) throws Exception {
+            synchronized (this.lock) {
+                BGPProtocolSessionPromise.LOG.debug("Promise {} connection resolved", this.lock);
+                Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(cf));
+                if (BGPProtocolSessionPromise.this.isCancelled()) {
+                    if (cf.isSuccess()) {
+                        BGPProtocolSessionPromise.LOG.debug("Closing channel for cancelled promise {}", this.lock);
+                        cf.channel().close();
+                    }
+
+                } else if (cf.isSuccess()) {
+                    BGPProtocolSessionPromise.LOG.debug("Promise {} connection successful", this.lock);
+                } else {
+                    BGPProtocolSessionPromise.LOG.debug("Attempt to connect to {} failed", BGPProtocolSessionPromise.this.address, cf.cause());
+                    final Future rf = BGPProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
+                    rf.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener.ReconnectingStrategyListener());
+                    BGPProtocolSessionPromise.this.pending = rf;
+                }
+            }
+        }
+
+        private class ReconnectingStrategyListener implements FutureListener<Void> {
+            private ReconnectingStrategyListener() {
+            }
+
+            @Override
+            public void operationComplete(final Future<Void> sf) {
+                synchronized (BootstrapConnectListener.this.lock) {
+                    Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(sf));
+                    if (!BGPProtocolSessionPromise.this.isCancelled()) {
+                        if (sf.isSuccess()) {
+                            BGPProtocolSessionPromise.this.connect();
+                        } else {
+                            BGPProtocolSessionPromise.this.setFailure(sf.cause());
+                        }
+                    }
+
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPReconnectPromise.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPReconnectPromise.java
new file mode 100644 (file)
index 0000000..9528d7f
--- /dev/null
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl.protocol;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+import java.net.InetSocketAddress;
+import org.opendaylight.protocol.bgp.rib.impl.BGPDispatcherImpl;
+import org.opendaylight.protocol.bgp.rib.impl.BGPSessionImpl;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BGPReconnectPromise extends DefaultPromise<Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(BGPReconnectPromise.class);
+
+    private final InetSocketAddress address;
+    private final ReconnectStrategyFactory strategyFactory;
+    private final Bootstrap b;
+    private final BGPDispatcherImpl.ChannelPipelineInitializer initializer;
+    private final EventExecutor executor;
+    private Future<BGPSessionImpl> pending;
+
+    public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
+                               final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap b,
+                               final BGPDispatcherImpl.ChannelPipelineInitializer initializer) {
+        super(executor);
+        this.executor = executor;
+        this.b = b;
+        this.initializer = Preconditions.checkNotNull(initializer);
+        this.address = Preconditions.checkNotNull(address);
+        this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
+    }
+
+    public synchronized void connect() {
+        final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
+
+        // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
+        pending = createClient(this.address, cs, b, new BGPDispatcherImpl.ChannelPipelineInitializer() {
+            @Override
+            public void initializeChannel(final SocketChannel channel, final Promise<BGPSessionImpl> promise) {
+                initializer.initializeChannel(channel, promise);
+                // add closed channel handler
+                // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+                // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+                // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+                channel.pipeline().addLast(new ClosedChannelHandler(BGPReconnectPromise.this));
+            }
+        });
+
+        pending.addListener(new GenericFutureListener<Future<Object>>() {
+            @Override
+            public void operationComplete(Future<Object> future) throws Exception {
+                if (!future.isSuccess()) {
+                    BGPReconnectPromise.this.setFailure(future.cause());
+                }
+            }
+        });
+    }
+
+    public Future<BGPSessionImpl> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap,
+                                               final BGPDispatcherImpl.ChannelPipelineInitializer initializer) {
+        final BGPProtocolSessionPromise p = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
+        final ChannelHandler chInit = BGPDispatcherImpl.BGPChannel.createChannelInitializer(initializer, p);
+        bootstrap.handler(chInit);
+        p.connect();
+        LOG.debug("Client created.");
+        return p;
+    }
+
+    /**
+     * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
+     */
+    private boolean isInitialConnectFinished() {
+        Preconditions.checkNotNull(pending);
+        return pending.isDone() && pending.isSuccess();
+    }
+
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        if (super.cancel(mayInterruptIfRunning)) {
+            Preconditions.checkNotNull(pending);
+            this.pending.cancel(mayInterruptIfRunning);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Channel handler that responds to channelInactive event and reconnects the session.
+     * Only if the promise was not canceled.
+     */
+    private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
+        private final BGPReconnectPromise promise;
+
+        public ClosedChannelHandler(final BGPReconnectPromise promise) {
+            this.promise = promise;
+        }
+
+        @Override
+        public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+            // This is the ultimate channel inactive handler, not forwarding
+            if (promise.isCancelled()) {
+                return;
+            }
+
+            if (!promise.isInitialConnectFinished()) {
+                LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
+            }
+
+            LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
+            promise.connect();
+        }
+    }
+}
index 8dcd5aed9685bc9397d6a0cc3cd837ea8fb03f7c..6f9f7f32d7a91dc1802a223c6c60b8f0ca9315d0 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.protocol.bgp.rib.impl.spi;
 
+import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.Future;
 import java.net.InetSocketAddress;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
@@ -18,7 +19,7 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.
 /**
  * Dispatcher class for creating BGP clients.
  */
-public interface BGPDispatcher extends BGPServerDispatcher {
+public interface BGPDispatcher{
 
     /**
      * Creates BGP client.
@@ -33,10 +34,10 @@ public interface BGPDispatcher extends BGPServerDispatcher {
             BGPPeerRegistry peerRegistry, ReconnectStrategy strategy);
 
     Future<Void> createReconnectingClient(InetSocketAddress address, AsNumber remoteAs,
-                                          BGPPeerRegistry peerRegistry, ReconnectStrategyFactory connectStrategyFactory,
-            ReconnectStrategyFactory reestablishStrategyFactory);
+                                          BGPPeerRegistry peerRegistry, ReconnectStrategyFactory connectStrategyFactory, KeyMapping keys);
 
-    Future<Void> createReconnectingClient(InetSocketAddress address, AsNumber remoteAs,
-                                          BGPPeerRegistry peerRegistry, ReconnectStrategyFactory connectStrategyFactory,
-            ReconnectStrategyFactory reestablishStrategyFactory, KeyMapping keys);
+    /**
+     * Create new BGP server to accept incoming bgp connections (bound to provided socket address).
+     */
+    ChannelFuture createServer(BGPPeerRegistry peerRegistry, InetSocketAddress address, BGPSessionValidator sessionValidator);
 }
index 9b738540eb283431cec08a08a44325ed93fda5c3..d4cd00a81b16d09f88a368b38f248e6892be4b4c 100644 (file)
@@ -115,8 +115,7 @@ public class ParserToSalTest extends AbstractDataBrokerTest {
 
         Mockito.doReturn(GlobalEventExecutor.INSTANCE.newSucceededFuture(null)).when(this.dispatcher).createReconnectingClient(
                 Mockito.any(InetSocketAddress.class), Mockito.any(AsNumber.class),
-                Mockito.any(BGPPeerRegistry.class), Mockito.eq(this.tcpStrategyFactory), Mockito.eq(this.sessionStrategy),
-                Mockito.any(KeyMapping.class));
+                Mockito.any(BGPPeerRegistry.class), Mockito.eq(this.tcpStrategyFactory), Mockito.any(KeyMapping.class));
 
         this.ext1 = new SimpleRIBExtensionProviderContext();
         this.ext2 = new SimpleRIBExtensionProviderContext();
index 53560ace94cc660ebe936c29f0f9c62fbed801c3..40b97798f0c6410a37790f9755d630869bfce1f4 100644 (file)
@@ -64,7 +64,12 @@ public class SimpleSessionListener implements ReusableBGPPeer {
     public void releaseConnection() {
         LOG.debug("Releasing connection");
         if (this.session != null) {
-            this.session.close();
+            try {
+                this.session.close();
+            } catch (Exception e) {
+                LOG.warn("Error closing session", e);
+            }
+            this.session = null;
         }
     }
 
index 72b50d30431de7944b503772489977aef17cd218..135bf8a4a00475c4b1d2d3607ca20077eca783d2 100644 (file)
@@ -11,70 +11,45 @@ package org.opendaylight.protocol.bgp.rib.impl;
 import com.google.common.base.Optional;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import org.opendaylight.protocol.bgp.parser.spi.MessageRegistry;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
-import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.AbstractDispatcher;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
 
-public class TestClientDispatcher extends AbstractDispatcher<BGPSessionImpl, BGPSessionListener> {
-
-    private static final String NEGOTIATOR = "negotiator";
+public class TestClientDispatcher {
 
     private final BGPHandlerFactory hf;
-    private InetSocketAddress localAddress;
     private final InetSocketAddress defaulAddress;
+    private InetSocketAddress localAddress;
+    private final BGPDispatcherImpl disp;
 
     protected TestClientDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup, final MessageRegistry messageRegistry,
-            final InetSocketAddress locaAddress) {
-        super(bossGroup, workerGroup);
+                                   final InetSocketAddress locaAddress) {
+        disp = new BGPDispatcherImpl(messageRegistry, bossGroup, workerGroup) {
+            @Override
+            protected void customizeBootstrap(final Bootstrap b) {
+                b.localAddress(locaAddress);
+            }
+        };
         this.hf = new BGPHandlerFactory(messageRegistry);
         this.localAddress = locaAddress;
         this.defaulAddress = locaAddress;
     }
 
     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress remoteAddress,
-            final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy, final Optional<InetSocketAddress> localAddress) {
+                                                            final AsNumber remoteAs, final BGPPeerRegistry listener, final ReconnectStrategy strategy, final Optional<InetSocketAddress> localAddress) {
         setLocalAddress(localAddress);
-        final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, listener);
-        return super.createClient(remoteAddress, strategy, new PipelineInitializer<BGPSessionImpl>() {
-
-            @Override
-            public void initializeChannel(SocketChannel ch, Promise<BGPSessionImpl> promise) {
-                ch.pipeline().addLast(TestClientDispatcher.this.hf.getDecoders());
-                ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(null, ch, promise));
-                ch.pipeline().addLast(TestClientDispatcher.this.hf.getEncoders());
-            }
-        });
+        return disp.createClient(remoteAddress, remoteAs, listener, strategy);
     }
 
     public synchronized Future<Void> createReconnectingClient(final InetSocketAddress address,
-        final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory reconnectStrategyFactory,
-        final Optional<InetSocketAddress> localAddress) {
+                                                              final AsNumber remoteAs, final BGPPeerRegistry peerRegistry, final ReconnectStrategyFactory reconnectStrategyFactory,
+                                                              final Optional<InetSocketAddress> localAddress) {
         setLocalAddress(localAddress);
-        final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(remoteAs, peerRegistry);
-        final Future<Void> ret = super.createReconnectingClient(address, reconnectStrategyFactory, new PipelineInitializer<BGPSessionImpl>() {
-            @Override
-            public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
-                ch.pipeline().addLast(TestClientDispatcher.this.hf.getDecoders());
-                ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(null, ch, promise));
-                ch.pipeline().addLast(TestClientDispatcher.this.hf.getEncoders());
-            }
-        });
-
-        return ret;
-    }
-
-    @Override
-    protected void customizeBootstrap(Bootstrap b) {
-        b.localAddress(this.localAddress);
-        super.customizeBootstrap(b);
+        return disp.createReconnectingClient(address, remoteAs, peerRegistry, reconnectStrategyFactory, null);
     }
 
     private synchronized void setLocalAddress(final Optional<InetSocketAddress> localAddress) {
index bbfd76cc6cbd446bab3cacb326de497a04024776..71809009e7aa99ce0e1c905c2c96c951eeaeebc8 100644 (file)
             <groupId>org.opendaylight.yangtools.model</groupId>
             <artifactId>ietf-inet-types</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>protocol-framework</artifactId>
-        </dependency>
 
         <dependency>
             <groupId>com.google.code.findbugs</groupId>
index ec676c2ee1f95ea53798f7cf51af366983134914..c5e1aff7124da3e9f292528bace4a97512a87ee3 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.protocol.bgp.rib.mock;
 import com.google.common.collect.Sets;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
+import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import java.util.Set;
 import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl;
@@ -87,6 +88,61 @@ final class EventBusRegistration extends AbstractListenerRegistration<BGPSession
 
                 private static final long AS = 30L;
 
+                @Override
+                public void channelRegistered(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+                }
+
+                @Override
+                public void channelUnregistered(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+                }
+
+                @Override
+                public void channelActive(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+                }
+
+                @Override
+                public void channelInactive(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+                }
+
+                @Override
+                public void channelRead(final ChannelHandlerContext channelHandlerContext, final Object o) throws Exception {
+
+                }
+
+                @Override
+                public void channelReadComplete(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+                }
+
+                @Override
+                public void userEventTriggered(final ChannelHandlerContext channelHandlerContext, final Object o) throws Exception {
+
+                }
+
+                @Override
+                public void channelWritabilityChanged(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+                }
+
+                @Override
+                public void handlerAdded(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+                }
+
+                @Override
+                public void handlerRemoved(final ChannelHandlerContext channelHandlerContext) throws Exception {
+
+                }
+
+                @Override
+                public void exceptionCaught(final ChannelHandlerContext channelHandlerContext, final Throwable throwable) throws Exception {
+
+                }
+
                 @Override
                 public void close() {
                     LOG.debug("Session {} closed", this);
index 6684463db6cce657abfb1a619688520929ea5bc6..dea4617b96e9fa068e33aca662221af498c00524 100644 (file)
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>config-api</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>protocol-framework</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>sal-core-api</artifactId>
index f39ed39127a1ec5ccb66b848be6d95f8c1d5babe..185c1cc1e548d4a21c6f8e98984f34cf9bf9e61f 100644 (file)
@@ -7,13 +7,11 @@
  */
 package org.opendaylight.protocol.bgp.rib.spi;
 
+import io.netty.channel.ChannelInboundHandler;
 import java.util.Set;
-
-import org.opendaylight.protocol.framework.ProtocolSession;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.BgpTableType;
-import org.opendaylight.yangtools.yang.binding.Notification;
 
 /**
  * BGP Session represents the finite state machine in BGP, including timers and its purpose is to create a BGP
@@ -22,7 +20,7 @@ import org.opendaylight.yangtools.yang.binding.Notification;
  *
  * If the session is up, it has to redirect messages to/from user. Handles also malformed messages and unknown requests.
  */
-public interface BGPSession extends ProtocolSession<Notification> {
+public interface BGPSession extends AutoCloseable, ChannelInboundHandler {
     /**
      * Return the list of tables which the peer has advertised to support.
      *
index 06f138c0e4fc36174fe4e2b694f8821b5a9e142c..7a93fb218fead9c1864642f2719b5152c88259d3 100644 (file)
@@ -7,14 +7,14 @@
  */
 package org.opendaylight.protocol.bgp.rib.spi;
 
-import org.opendaylight.protocol.framework.SessionListener;
+import java.util.EventListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey;
 import org.opendaylight.yangtools.yang.binding.Notification;
 
 /**
  * Listener that receives session informations from the session.
  */
-public interface BGPSessionListener extends SessionListener<Notification, BGPSession, BGPTerminationReason> {
+public interface BGPSessionListener extends EventListener {
 
     /**
      * Returns state of BGP session associated with this listener.
@@ -29,4 +29,34 @@ public interface BGPSessionListener extends SessionListener<Notification, BGPSes
      * @param tablesKey of the table where synchronization finished
      */
     void markUptodate(final TablesKey tablesKey);
+
+    /**
+     * Fired when the session was established successfully.
+     *
+     * @param session Peer address families which we accepted
+     */
+    void onSessionUp(BGPSession session);
+    /**
+     * Fired when the session went down because of an IO error. Implementation should take care of closing underlying
+     * session.
+     *
+     * @param session that went down
+     * @param e Exception that was thrown as the cause of session being down
+     */
+
+    void onSessionDown(BGPSession session, Exception e);
+    /**
+     * Fired when the session is terminated locally. The session has already been closed and transitioned to IDLE state.
+     * Any outstanding queued messages were not sent. The user should not attempt to make any use of the session.
+     *
+     * @param reason the cause why the session went down
+     */
+    void onSessionTerminated(BGPSession session, BGPTerminationReason reason);
+
+    /**
+     * Fired when a normal protocol message is received.
+     *
+     * @param notification Protocol message
+     */
+    void onMessage(BGPSession session, Notification notification);
 }
diff --git a/bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/BGPSessionNegotiatorFactory.java b/bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/BGPSessionNegotiatorFactory.java
new file mode 100644 (file)
index 0000000..5b8d6cb
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.spi;
+
+import io.netty.channel.Channel;
+import io.netty.util.concurrent.Promise;
+
+public interface BGPSessionNegotiatorFactory<S extends BGPSession> {
+    SessionNegotiator getSessionNegotiator(Channel channel, Promise<S> promise);
+}
\ No newline at end of file
index 694bd8c603830cb90ddf26ba5363c493185e7393..af9e2ca6cd6c4d40c0648f7b89ab2165323a61d0 100644 (file)
@@ -9,16 +9,14 @@ package org.opendaylight.protocol.bgp.rib.spi;
 
 import com.google.common.base.MoreObjects;
 import org.opendaylight.protocol.bgp.parser.BGPError;
-import org.opendaylight.protocol.framework.TerminationReason;
 
-public final class BGPTerminationReason implements TerminationReason {
+public final class BGPTerminationReason {
     private final BGPError error;
 
     public BGPTerminationReason(final BGPError error) {
         this.error = error;
     }
 
-    @Override
     public String getErrorMessage() {
         return error.toString();
     }
diff --git a/bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/SessionNegotiator.java b/bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/SessionNegotiator.java
new file mode 100644 (file)
index 0000000..e1ab25e
--- /dev/null
@@ -0,0 +1,13 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.spi;
+
+import io.netty.channel.ChannelInboundHandler;
+
+public interface SessionNegotiator extends ChannelInboundHandler {
+}
index 6e981cd5ec27268e2ec4191034bcb5f2785a0c2f..84e2831656c92ff50964c56eb26d3b7befdec011 100644 (file)
@@ -34,7 +34,11 @@ public class TestingListener implements ReusableBGPPeer {
     @Override
     public void onSessionDown(final BGPSession session, final Exception e) {
         LOG.info("Client Listener: Connection lost.");
-        session.close();
+        try {
+            session.close();
+        } catch (Exception ie) {
+            LOG.warn("Error closing session", ie);
+        }
     }
 
     @Override
index 280788c53b03f89a571579f4b648fa1b480aa28a..80a8336b1672f777391861f127919a5e586e5d49 100644 (file)
@@ -9,15 +9,14 @@ package org.opendaylight.protocol.bgp.testtool;
 
 import com.google.common.base.Preconditions;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
 import org.opendaylight.protocol.bgp.parser.spi.pojo.ServiceLoaderBGPExtensionProviderContext;
+import org.opendaylight.protocol.bgp.rib.impl.BGPDispatcherImpl;
 import org.opendaylight.protocol.bgp.rib.impl.BGPHandlerFactory;
 import org.opendaylight.protocol.bgp.rib.impl.BGPServerSessionNegotiatorFactory;
 import org.opendaylight.protocol.bgp.rib.impl.BGPSessionImpl;
@@ -27,10 +26,6 @@ import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionValidator;
 import org.opendaylight.protocol.bgp.rib.impl.spi.ReusableBGPPeer;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
-import org.opendaylight.protocol.framework.AbstractDispatcher;
-import org.opendaylight.protocol.framework.ProtocolSession;
-import org.opendaylight.protocol.framework.SessionListener;
-import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
@@ -41,44 +36,30 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.type
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv4AddressFamily;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.SubsequentAddressFamily;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
-import org.opendaylight.yangtools.yang.binding.Notification;
 
-public class BGPSpeakerMock<M, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends AbstractDispatcher<S, L> {
+public class BGPSpeakerMock {
 
-    private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
+    private final BGPServerSessionNegotiatorFactory negotiatorFactory;
     private final BGPHandlerFactory factory;
+    private final BGPDispatcherImpl disp;
+    private final BGPPeerRegistry peerRegistry;
+    private final Map<Class<? extends AddressFamily>, Class<? extends SubsequentAddressFamily>> tables;
 
-    public BGPSpeakerMock(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final BGPHandlerFactory factory,
-        final DefaultPromise<BGPSessionImpl> defaultPromise) {
-        super(GlobalEventExecutor.INSTANCE, new NioEventLoopGroup(), new NioEventLoopGroup());
+    private BGPSpeakerMock(final BGPServerSessionNegotiatorFactory negotiatorFactory, final BGPHandlerFactory factory,
+                           final DefaultPromise<BGPSessionImpl> defaultPromise) {
+        disp = new BGPDispatcherImpl(null, new NioEventLoopGroup(), new NioEventLoopGroup());
         this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
         this.factory = Preconditions.checkNotNull(factory);
-    }
 
-    public void createServer(final InetSocketAddress address) {
-        super.createServer(address, new PipelineInitializer<S>() {
 
+        peerRegistry = new BGPPeerRegistry() {
             @Override
-            public void initializeChannel(final SocketChannel ch, final Promise<S> promise) {
-                ch.pipeline().addLast(BGPSpeakerMock.this.factory.getDecoders());
-                ch.pipeline().addLast("negotiator",
-                    BGPSpeakerMock.this.negotiatorFactory.getSessionNegotiator(null, ch, promise));
-                ch.pipeline().addLast(BGPSpeakerMock.this.factory.getEncoders());
+            public void addPeer(final IpAddress ip, final ReusableBGPPeer peer, final BGPSessionPreferences prefs) {
             }
-        });
-    }
-
-    public static void main(final String[] args) throws Exception {
-        final Map<Class<? extends AddressFamily>, Class<? extends SubsequentAddressFamily>> tables = new HashMap<>();
-        tables.put(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
-        tables.put(LinkstateAddressFamily.class, LinkstateSubsequentAddressFamily.class);
 
-        final BGPPeerRegistry peerRegistry = new BGPPeerRegistry() {
             @Override
-            public void addPeer(final IpAddress ip, final ReusableBGPPeer peer, final BGPSessionPreferences prefs) {}
-
-            @Override
-            public void removePeer(final IpAddress ip) {}
+            public void removePeer(final IpAddress ip) {
+            }
 
             @Override
             public boolean isPeerConfigured(final IpAddress ip) {
@@ -101,18 +82,35 @@ public class BGPSpeakerMock<M, S extends ProtocolSession<M>, L extends SessionLi
             }
 
             @Override
-            public void removePeerSession(final IpAddress ip) {}
+            public void removePeerSession(final IpAddress ip) {
+            }
         };
 
-        final SessionNegotiatorFactory<Notification, BGPSessionImpl, BGPSessionListener> snf = new BGPServerSessionNegotiatorFactory(new BGPSessionValidator() {
+        tables = new HashMap<>();
+        tables.put(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
+        tables.put(LinkstateAddressFamily.class, LinkstateSubsequentAddressFamily.class);
+    }
+
+    public void main(final String[] args) {
+
+        final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(new BGPSessionValidator() {
             @Override
             public void validate(final Open openObj, final BGPSessionPreferences prefs) throws BGPDocumentedException {
                 // NOOP
             }
         }, peerRegistry);
 
-        final BGPSpeakerMock<Notification, BGPSessionImpl, BGPSessionListener> mock = new BGPSpeakerMock<>(snf, new BGPHandlerFactory(ServiceLoaderBGPExtensionProviderContext.getSingletonInstance().getMessageRegistry()), new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
+        final BGPSpeakerMock mock = new BGPSpeakerMock(snf, new BGPHandlerFactory(ServiceLoaderBGPExtensionProviderContext.getSingletonInstance().getMessageRegistry()), new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
 
         mock.createServer(new InetSocketAddress("127.0.0.2", 12345));
     }
+
+    private void createServer(final InetSocketAddress address) {
+        disp.createServer(peerRegistry,address, new BGPSessionValidator() {
+            @Override
+            public void validate(final Open openObj, final BGPSessionPreferences prefs) throws BGPDocumentedException {
+                // NOOP
+            }
+        });
+    }
 }
index bf49d1082a75e149a553765207b9284c44500720..40567ea160166b512f38b84fbfb3cded5989b819 100644 (file)
@@ -31,7 +31,11 @@ public class SpeakerSessionListener implements BGPSessionListener {
     @Override
     public void onSessionDown(final BGPSession session, final Exception e) {
         LOG.info("Server: Session down.");
-        session.close();
+        try {
+            session.close();
+        } catch (Exception ie) {
+            LOG.warn("Error closing session", ie);
+        }
         // this.d.stop();
     }
 
index 6da65fc2d1a973145680f220283836c31b3cc4f8..bfebb36b848785967436d04e7c26ebfb44ded50b 100644 (file)
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>sal-binding-config</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.opendaylight.controller</groupId>
-            <artifactId>protocol-framework</artifactId>
-        </dependency>
 
         <dependency>
             <groupId>io.netty</groupId>